diff --git a/senders/terminal/Cargo.toml b/senders/terminal/Cargo.toml index c134a73..b170021 100644 --- a/senders/terminal/Cargo.toml +++ b/senders/terminal/Cargo.toml @@ -12,4 +12,5 @@ serde_json = "1.0" ctrlc = "3.1.9" tungstenite = { version = "0.21.0" } url = "2.5.0" -tiny_http = "0.12.0" \ No newline at end of file +tiny_http = "0.12.0" +serde_repr = "0.1.20" \ No newline at end of file diff --git a/senders/terminal/README.md b/senders/terminal/README.md index 437be23..c2b81d0 100644 --- a/senders/terminal/README.md +++ b/senders/terminal/README.md @@ -50,4 +50,13 @@ cat dash.mpd | ./fcast -h localhost play --mime_type application/dash+xml # Set speed to double ./fcast -h localhost setspeed -s 2.0 + +# Receive keyboard events +./fcast -h localhost -s KeyDown,KeyUp listen + +# Show image playlist +cat image_playlist_example.json | ./fcast -h localhost play --mime_type application/json + +# Play from video playlist +cat image_playlist_example.json | ./fcast -h localhost play --mime_type application/json ``` diff --git a/senders/terminal/image_playlist_example.json b/senders/terminal/image_playlist_example.json new file mode 100644 index 0000000..fdc64f1 --- /dev/null +++ b/senders/terminal/image_playlist_example.json @@ -0,0 +1,14 @@ +{ + "contentType": 0, + "items": [ + { + "container": "image/jpeg", + "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/9/90/Everest%2C_Himalayas.jpg/640px-Everest%2C_Himalayas.jpg" + }, + { + "container": "image/png", + "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/6/62/NTV7_Testpattern.png/640px-NTV7_Testpattern.png" + } + ], + "offset": 0 +} diff --git a/senders/terminal/src/fcastsession.rs b/senders/terminal/src/fcastsession.rs index 366ab0c..7e74503 100644 --- a/senders/terminal/src/fcastsession.rs +++ b/senders/terminal/src/fcastsession.rs @@ -1,13 +1,27 @@ -use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; -use crate::{models::{PlaybackUpdateMessage, VolumeUpdateMessage, PlaybackErrorMessage, VersionMessage}, transport::Transport}; +use crate::{ + models::{v2, v3, PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage}, + transport::Transport, +}; use serde::Serialize; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] +enum ProtoVersion { + V2, + V3, +} + +#[derive(Debug, PartialEq, Eq)] enum SessionState { - Idle = 0, - WaitingForLength, - WaitingForData, + Idle, + Connected(ProtoVersion), Disconnected, } @@ -26,7 +40,13 @@ pub enum Opcode { SetSpeed = 10, Version = 11, Ping = 12, - Pong = 13 + Pong = 13, + Initial = 14, + PlayUpdate = 15, + SetPlaylistItem = 16, + SubscribeEvent = 17, + UnsubscribeEvent = 18, + Event = 19, } impl Opcode { @@ -46,6 +66,12 @@ impl Opcode { 11 => Opcode::Version, 12 => Opcode::Ping, 13 => Opcode::Pong, + 14 => Opcode::Initial, + 15 => Opcode::PlayUpdate, + 16 => Opcode::SetPlaylistItem, + 17 => Opcode::SubscribeEvent, + 18 => Opcode::UnsubscribeEvent, + 19 => Opcode::Event, _ => panic!("Unknown value: {}", value), } } @@ -56,26 +82,100 @@ const MAXIMUM_PACKET_LENGTH: usize = 32000; pub struct FCastSession<'a> { buffer: Vec, - bytes_read: usize, - packet_length: usize, stream: Box, - state: SessionState + state: SessionState, } impl<'a> FCastSession<'a> { pub fn new(stream: T) -> Self { - return FCastSession { + Self { buffer: vec![0; MAXIMUM_PACKET_LENGTH], - bytes_read: 0, - packet_length: 0, stream: Box::new(stream), - state: SessionState::Idle + state: SessionState::Idle, } } -} -impl FCastSession<'_> { - pub fn send_message(&mut self, opcode: Opcode, message: T) -> Result<(), Box> { + pub fn connect(stream: T) -> Result> { + let mut session = Self::new(stream); + + session.send_message( + Opcode::Version, + crate::models::VersionMessage { version: 3 }, + )?; + + let (opcode, body) = session.read_packet()?; + + if opcode != Opcode::Version { + return Err(format!("Expected Opcode::Version, got {opcode:?}").into()); + } + + let msg: VersionMessage = + serde_json::from_str(&body.ok_or("Version requires body".to_owned())?)?; + + if msg.version == 3 { + let initial = v3::InitialSenderMessage { + display_name: None, + app_name: Some(env!("CARGO_PKG_NAME").to_owned()), + app_version: Some(env!("CARGO_PKG_VERSION").to_owned()), + }; + session.send_message(Opcode::Initial, initial)?; + let (opcode, body) = session.read_packet()?; + if opcode != Opcode::Initial { + return Err(format!("Expected Opcode::Initial, got {opcode:?}").into()); + } + let inital_receiver: v3::InitialReceiverMessage = + serde_json::from_str(&body.ok_or("InitialReceiverMessage requires body")?)?; + println!("Got inital message from sender: {inital_receiver:?}"); + session.state = SessionState::Connected(ProtoVersion::V3); + } else { + session.state = SessionState::Connected(ProtoVersion::V2); + } + + Ok(session) + } + + fn read_packet(&mut self) -> Result<(Opcode, Option), Box> { + let mut header_buf = [0u8; 5]; + self.stream.transport_read_exact(&mut header_buf)?; + + let opcode = Opcode::from_u8(header_buf[4]); + let body_length = + u32::from_le_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]]) + as usize + - 1; + + if body_length > MAXIMUM_PACKET_LENGTH { + println!( + "Maximum packet length is 32kB, killing stream: {}", + body_length, + ); + + self.stream.transport_shutdown()?; + self.state = SessionState::Disconnected; + return Err(format!( + "Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", + body_length, + ) + .into()); + } + + self.stream + .transport_read_exact(&mut self.buffer[0..body_length])?; + + let body_json = if body_length > 0 { + Some(String::from_utf8(self.buffer[0..body_length].to_vec())?) + } else { + None + }; + + Ok((opcode, body_json)) + } + + pub fn send_message( + &mut self, + opcode: Opcode, + message: T, + ) -> Result<(), Box> { let json = serde_json::to_string(&message)?; let data = json.as_bytes(); let size = 1 + data.len(); @@ -83,13 +183,47 @@ impl FCastSession<'_> { let mut header = vec![0u8; header_size]; header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes()); header[LENGTH_BYTES] = opcode as u8; - + let packet = [header, data.to_vec()].concat(); - println!("Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).", packet.len(), opcode, header_size, data.len(), json); + println!( + "Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).", + packet.len(), + opcode, + header_size, + data.len(), + json + ); self.stream.transport_write(&packet)?; Ok(()) } + pub fn subscribe(&mut self, event: v3::EventType) -> Result<(), Box> { + if self.state != SessionState::Connected(ProtoVersion::V3) { + return Err(format!( + "Cannot subscribe to events in the current state ({:?})", + self.state + ) + .into()); + } + + let obj = match event { + v3::EventType::MediaItemStart => v3::EventSubscribeObject::MediaItemStart, + v3::EventType::MediaItemEnd => v3::EventSubscribeObject::MediaItemEnd, + v3::EventType::MediaItemChange => v3::EventSubscribeObject::MediaItemChanged, + v3::EventType::KeyDown => v3::EventSubscribeObject::KeyDown { + keys: v3::KeyNames::all(), + }, + v3::EventType::KeyUp => v3::EventSubscribeObject::KeyUp { + keys: v3::KeyNames::all(), + }, + }; + + self.send_message( + Opcode::SubscribeEvent, + v3::SubscribeEventMessage { event: obj }, + ) + } + pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), Box> { let json = String::new(); let data = json.as_bytes(); @@ -97,129 +231,96 @@ impl FCastSession<'_> { let mut header = vec![0u8; LENGTH_BYTES + 1]; header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes()); header[LENGTH_BYTES] = opcode as u8; - + let packet = [header, data.to_vec()].concat(); self.stream.transport_write(&packet)?; Ok(()) } - pub fn receive_loop(&mut self, running: &Arc) -> Result<(), Box> { + pub fn receive_loop( + &mut self, + running: &Arc, + ) -> Result<(), Box> { println!("Start receiving."); - self.state = SessionState::WaitingForLength; - - let mut buffer = [0u8; 1024]; while running.load(Ordering::SeqCst) { - let bytes_read = self.stream.transport_read(&mut buffer)?; - self.process_bytes(&buffer[..bytes_read])?; + let (opcode, body) = self.read_packet()?; + self.handle_packet(opcode, body)?; } - self.state = SessionState::Idle; Ok(()) } - fn process_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box> { - if received_bytes.is_empty() { - return Ok(()); - } - - println!("{} bytes received", received_bytes.len()); - + pub fn send_play_message( + &mut self, + mime_type: String, + url: Option, + content: Option, + time: Option, + speed: Option, + headers: Option>, + ) -> Result<(), Box> { match self.state { - SessionState::WaitingForLength => self.handle_length_bytes(received_bytes)?, - SessionState::WaitingForData => self.handle_packet_bytes(received_bytes)?, - _ => println!("Data received is unhandled in current session state {:?}", self.state), + SessionState::Connected(ProtoVersion::V2) => { + let msg = v2::PlayMessage { + container: mime_type, + url, + content, + time, + speed, + headers, + }; + self.send_message(Opcode::Play, msg)?; + } + SessionState::Connected(ProtoVersion::V3) => { + let msg = v3::PlayMessage { + container: mime_type, + url, + content, + time, + volume: Some(1.0), + speed, + headers, + metadata: None, + }; + self.send_message(Opcode::Play, msg)?; + } + _ => return Err("invalid state for sending play message".into()), } Ok(()) } - - fn handle_length_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box> { - let bytes_to_read = std::cmp::min(LENGTH_BYTES, received_bytes.len()); - let bytes_remaining = received_bytes.len() - bytes_to_read; - self.buffer[self.bytes_read..self.bytes_read + bytes_to_read] - .copy_from_slice(&received_bytes[..bytes_to_read]); - self.bytes_read += bytes_to_read; - - println!("handleLengthBytes: Read {} bytes from packet", bytes_to_read); - - if self.bytes_read >= LENGTH_BYTES { - self.state = SessionState::WaitingForData; - self.packet_length = u32::from_le_bytes(self.buffer[..LENGTH_BYTES].try_into()?) as usize; - self.bytes_read = 0; - - println!("Packet length header received from: {}", self.packet_length); - - if self.packet_length > MAXIMUM_PACKET_LENGTH { - println!("Maximum packet length is 32kB, killing stream: {}", self.packet_length); - - self.stream.transport_shutdown()?; - self.state = SessionState::Disconnected; - return Err(format!("Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", self.packet_length).into()); - } - - if bytes_remaining > 0 { - println!("{} remaining bytes pushed to handlePacketBytes", bytes_remaining); - - self.handle_packet_bytes(&received_bytes[bytes_to_read..])?; - } - } - - Ok(()) - } - - fn handle_packet_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box> { - let bytes_to_read = std::cmp::min(self.packet_length, received_bytes.len()); - let bytes_remaining = received_bytes.len() - bytes_to_read; - self.buffer[self.bytes_read..self.bytes_read + bytes_to_read] - .copy_from_slice(&received_bytes[..bytes_to_read]); - self.bytes_read += bytes_to_read; - - println!("handlePacketBytes: Read {} bytes from packet", bytes_to_read); - - if self.bytes_read >= self.packet_length { - println!("Packet finished receiving of {} bytes.", self.packet_length); - self.handle_next_packet()?; - - self.state = SessionState::WaitingForLength; - self.packet_length = 0; - self.bytes_read = 0; - - if bytes_remaining > 0 { - println!("{} remaining bytes pushed to handleLengthBytes", bytes_remaining); - self.handle_length_bytes(&received_bytes[bytes_to_read..])?; - } - } - - Ok(()) - } - - fn handle_next_packet(&mut self) -> Result<(), Box> { - println!("Processing packet of {} bytes", self.bytes_read); - - let opcode = Opcode::from_u8(self.buffer[0]); - let packet_length = self.packet_length; - let body = if packet_length > 1 { - Some(std::str::from_utf8(&self.buffer[1..packet_length])?.to_string()) - } else { - None - }; - - println!("Received body: {:?}", body); - self.handle_packet(opcode, body) - } - - fn handle_packet(&mut self, opcode: Opcode, body: Option) -> Result<(), Box> { + fn handle_packet( + &mut self, + opcode: Opcode, + body: Option, + ) -> Result<(), Box> { println!("Received message with opcode {:?}.", opcode); match opcode { Opcode::PlaybackUpdate => { if let Some(body_str) = body { - if let Ok(playback_update_msg) = serde_json::from_str::(body_str.as_str()) { - println!("Received playback update {:?}", playback_update_msg); - } else { - println!("Received playback update with malformed body."); + match self.state { + SessionState::Connected(ProtoVersion::V2) => { + if let Ok(playback_update_msg) = + serde_json::from_str::(body_str.as_str()) + { + println!("Received playback update {:?}", playback_update_msg); + } else { + println!("Received playback update with malformed body."); + } + } + SessionState::Connected(ProtoVersion::V3) => { + if let Ok(playback_update_msg) = + serde_json::from_str::(body_str.as_str()) + { + println!("Received playback update {:?}", playback_update_msg); + } else { + println!("Received playback update with malformed body."); + } + } + _ => unreachable!(), } } else { println!("Received playback update with no body."); @@ -227,7 +328,9 @@ impl FCastSession<'_> { } Opcode::VolumeUpdate => { if let Some(body_str) = body { - if let Ok(volume_update_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(volume_update_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received volume update {:?}", volume_update_msg); } else { println!("Received volume update with malformed body."); @@ -238,7 +341,9 @@ impl FCastSession<'_> { } Opcode::PlaybackError => { if let Some(body_str) = body { - if let Ok(playback_error_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(playback_error_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received playback error {:?}", playback_error_msg); } else { println!("Received playback error with malformed body."); @@ -249,7 +354,9 @@ impl FCastSession<'_> { } Opcode::Version => { if let Some(body_str) = body { - if let Ok(version_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(version_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received version {:?}", version_msg); } else { println!("Received version with malformed body."); @@ -263,6 +370,31 @@ impl FCastSession<'_> { self.send_empty(Opcode::Pong)?; println!("Sent pong"); } + Opcode::Pong => println!("Received pong"), + Opcode::PlayUpdate => { + if let Some(body_str) = body { + if let Ok(play_update_msg) = + serde_json::from_str::(&body_str) + { + println!("Received play update {play_update_msg:?}"); + } else { + println!("Received play update with malformed body."); + } + } else { + println!("Received play update with no body."); + } + } + Opcode::Event => { + if let Some(body_str) = body { + if let Ok(event_msg) = serde_json::from_str::(&body_str) { + println!("Received event {event_msg:?}"); + } else { + println!("Received event with malformed body."); + } + } else { + println!("Received event with no body."); + } + } _ => { println!("Error handling packet"); } @@ -272,6 +404,6 @@ impl FCastSession<'_> { } pub fn shutdown(&mut self) -> Result<(), std::io::Error> { - return self.stream.transport_shutdown(); + self.stream.transport_shutdown() } -} \ No newline at end of file +} diff --git a/senders/terminal/src/file_server.rs b/senders/terminal/src/file_server.rs deleted file mode 100644 index 2c9547b..0000000 --- a/senders/terminal/src/file_server.rs +++ /dev/null @@ -1,36 +0,0 @@ -struct FileServer { - base_url: String, - base_path: String, -} - -impl FileServer { - fn new(base_url: String, base_path: String) -> Self { - FileServer { base_url, base_path } - } - - async fn serve(&self) { - let file_server = warp::fs::dir(&self.base_path); - warp::serve(file_server).run(([127, 0, 0, 1], 3030)).await; - } - - fn get_url(&self, file_name: &str) -> String { - format!("{}/{}", self.base_url, file_name) - } -} - -pub async fn host_file_and_get_url(file_path: &str) -> Result> { - let file_name = Path::new(file_path).file_name().ok_or("Invalid file path")?.to_str().ok_or("Invalid file name")?; - let file_server = FileServer::new("http://127.0.0.1:3030".to_string(), "path/to/hosted/files".to_string()); - - // Copy the file to the hosting directory - let destination = Path::new(&file_server.base_path).join(file_name); - tokio::fs::copy(file_path, &destination).await?; - - // Start the server if not already running - // This part needs synchronization in a real-world scenario - tokio::spawn(async move { - file_server.serve().await; - }); - - Ok(file_server.get_url(file_name)) -} \ No newline at end of file diff --git a/senders/terminal/src/lib.rs b/senders/terminal/src/lib.rs new file mode 100644 index 0000000..b21a5b8 --- /dev/null +++ b/senders/terminal/src/lib.rs @@ -0,0 +1,3 @@ +pub mod fcastsession; +pub mod models; +pub mod transport; diff --git a/senders/terminal/src/main.rs b/senders/terminal/src/main.rs index 8662edc..8b26f19 100644 --- a/senders/terminal/src/main.rs +++ b/senders/terminal/src/main.rs @@ -1,27 +1,27 @@ -mod models; -mod fcastsession; -mod transport; - use clap::{App, Arg, SubCommand}; -use tiny_http::{Server, Response, ListenAddr, Header}; -use tungstenite::stream::MaybeTlsStream; -use url::Url; +use fcast::models::v3; +use fcast::transport::WebSocket; use std::collections::HashMap; use std::net::IpAddr; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::thread::JoinHandle; -use std::{thread, fs}; use std::time::Instant; +use std::{fs, thread}; use std::{io::Read, net::TcpStream}; -use std::sync::atomic::{ AtomicBool, Ordering}; use std::{sync::Arc, time::Duration}; +use tiny_http::{Header, ListenAddr, Response, Server}; +use tungstenite::stream::MaybeTlsStream; +use url::Url; -use crate::fcastsession::Opcode; -use crate::models::{SetVolumeMessage, SetSpeedMessage}; -use crate::{models::{PlayMessage, SeekMessage}, fcastsession::FCastSession}; +use fcast::fcastsession::Opcode; +use fcast::{ + fcastsession::FCastSession, + models::{SeekMessage, SetSpeedMessage, SetVolumeMessage}, +}; -fn main() { +fn main() { if let Err(e) = run() { println!("Failed due to error: {}", e) } @@ -30,127 +30,173 @@ fn main() { fn run() -> Result<(), Box> { let app = App::new("Media Control") .about("Control media playback") - .arg(Arg::with_name("connection_type") - .short('c') - .long("connection_type") - .value_name("CONNECTION_TYPE") - .help("Type of connection: tcp or ws (websocket)") - .required(false) - .default_value("tcp") - .takes_value(true)) - .arg(Arg::with_name("host") - .short('h') - .long("host") - .value_name("Host") - .help("The host address to send the command to") - .required(true) - .takes_value(true)) - .arg(Arg::with_name("port") - .short('p') - .long("port") - .value_name("PORT") - .help("The port to send the command to") - .required(false) - .takes_value(true)) - .subcommand(SubCommand::with_name("play") - .about("Play media") - .arg(Arg::with_name("mime_type") - .short('m') - .long("mime_type") - .value_name("MIME_TYPE") - .help("Mime type (e.g., video/mp4)") - .required_unless_present("file") - .takes_value(true) - ) - .arg(Arg::with_name("file") - .short('f') - .long("file") - .value_name("File") - .help("File content to play") - .required(false) - .takes_value(true)) - .arg(Arg::with_name("url") - .short('u') - .long("url") - .value_name("URL") - .help("URL to the content") - .required(false) - .takes_value(true) - ) - .arg(Arg::with_name("content") + .arg( + Arg::with_name("connection_type") .short('c') - .long("content") - .value_name("CONTENT") - .help("The actual content") + .long("connection_type") + .value_name("CONNECTION_TYPE") + .help("Type of connection: tcp or ws (websocket)") .required(false) - .takes_value(true) - ) - .arg(Arg::with_name("timestamp") - .short('t') - .long("timestamp") - .value_name("TIMESTAMP") - .help("Timestamp to start playing") - .required(false) - .default_value("0") - .takes_value(true) - ) - .arg(Arg::with_name("speed") - .short('s') - .long("speed") - .value_name("SPEED") - .help("Factor to multiply playback speed by") - .required(false) - .default_value("1") - .takes_value(true) - ) - .arg(Arg::with_name("header") - .short('H') - .long("header") - .value_name("HEADER") - .help("Custom request headers in key:value format") - .required(false) - .multiple_occurrences(true) - ) + .default_value("tcp") + .takes_value(true), ) - .subcommand(SubCommand::with_name("seek") - .about("Seek to a timestamp") - .arg(Arg::with_name("timestamp") - .short('t') - .long("timestamp") - .value_name("TIMESTAMP") - .help("Timestamp to start playing") - .required(true) - .takes_value(true) - ), + .arg( + Arg::with_name("host") + .short('h') + .long("host") + .value_name("Host") + .help("The host address to send the command to") + .default_value("127.0.0.1") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("port") + .short('p') + .long("port") + .value_name("PORT") + .help("The port to send the command to") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("subscribe") + .short('s') + .long("subscribe") + .value_name("EVENTS") + .help("A comma separated list of events to subscribe to (e.g. MediaItemStart,KeyDown). \ + Available events: [MediaItemStart, MediaItemEnd, MediaItemChange, KeyDown, KeyUp]") + .required(false) + .takes_value(true), + ) + .subcommand( + SubCommand::with_name("play") + .about("Play media") + .arg( + Arg::with_name("mime_type") + .short('m') + .long("mime_type") + .value_name("MIME_TYPE") + .help("Mime type (e.g., video/mp4)") + .takes_value(true), + ) + .arg( + Arg::with_name("file") + .short('f') + .long("file") + .value_name("File") + .help("File content to play") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("url") + .short('u') + .long("url") + .value_name("URL") + .help("URL to the content") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("content") + .short('c') + .long("content") + .value_name("CONTENT") + .help("The actual content") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("timestamp") + .short('t') + .long("timestamp") + .value_name("TIMESTAMP") + .help("Timestamp to start playing") + .required(false) + .default_value("0") + .takes_value(true), + ) + .arg( + Arg::with_name("speed") + .short('s') + .long("speed") + .value_name("SPEED") + .help("Factor to multiply playback speed by") + .required(false) + .default_value("1") + .takes_value(true), + ) + .arg( + Arg::with_name("header") + .short('H') + .long("header") + .value_name("HEADER") + .help("Custom request headers in key:value format") + .required(false) + .multiple_occurrences(true), + ), + ) + .subcommand( + SubCommand::with_name("seek") + .about("Seek to a timestamp") + .arg( + Arg::with_name("timestamp") + .short('t') + .long("timestamp") + .value_name("TIMESTAMP") + .help("Timestamp to start playing") + .required(true) + .takes_value(true), + ), ) .subcommand(SubCommand::with_name("pause").about("Pause media")) .subcommand(SubCommand::with_name("resume").about("Resume media")) .subcommand(SubCommand::with_name("stop").about("Stop media")) .subcommand(SubCommand::with_name("listen").about("Listen to incoming events")) - .subcommand(SubCommand::with_name("setvolume").about("Set the volume") - .arg(Arg::with_name("volume") - .short('v') - .long("volume") - .value_name("VOLUME") - .help("Volume level (0-1)") - .required(true) - .takes_value(true))) - .subcommand(SubCommand::with_name("setspeed").about("Set the playback speed") - .arg(Arg::with_name("speed") - .short('s') - .long("speed") - .value_name("SPEED") - .help("Factor to multiply playback speed by") - .required(true) - .takes_value(true)) + .subcommand( + SubCommand::with_name("setvolume") + .about("Set the volume") + .arg( + Arg::with_name("volume") + .short('v') + .long("volume") + .value_name("VOLUME") + .help("Volume level (0-1)") + .required(true) + .takes_value(true), + ), + ) + .subcommand( + SubCommand::with_name("setspeed") + .about("Set the playback speed") + .arg( + Arg::with_name("speed") + .short('s') + .long("speed") + .value_name("SPEED") + .help("Factor to multiply playback speed by") + .required(true) + .takes_value(true), + ), + ) + .subcommand( + SubCommand::with_name("set_playlist_item") + .about("") + .arg( + Arg::with_name("item_index") + .short('i') + .long("item_index") + .value_name("INDEX") + .help("Index of the item in the playlist that should be play") + .required(true) + .takes_value(true), + ) ); let matches = app.get_matches(); - let host = match matches.value_of("host") { - Some(s) => s, - _ => return Err("Host is required.".into()) - }; + let host = matches.value_of("host").expect("host has default value"); let connection_type = matches.value_of("connection_type").unwrap_or("tcp"); @@ -159,8 +205,10 @@ fn run() -> Result<(), Box> { _ => match connection_type { "tcp" => "46899", "ws" => "46898", - _ => return Err("Unknown connection type, cannot automatically determine port.".into()) - } + _ => { + return Err("Unknown connection type, cannot automatically determine port.".into()) + } + }, }; let local_ip: Option; @@ -169,141 +217,147 @@ fn run() -> Result<(), Box> { println!("Connecting via TCP to host={} port={}...", host, port); let stream = TcpStream::connect(format!("{}:{}", host, port))?; local_ip = Some(stream.local_addr()?.ip()); - FCastSession::new(stream) - }, + FCastSession::connect(stream)? + } "ws" => { println!("Connecting via WebSocket to host={} port={}...", host, port); let url = Url::parse(format!("ws://{}:{}", host, port).as_str())?; let (stream, _) = tungstenite::connect(url)?; local_ip = match stream.get_ref() { MaybeTlsStream::Plain(ref stream) => Some(stream.local_addr()?.ip()), - _ => return Err("Established connection type is not plain.".into()) + _ => return Err("Established connection type is not plain.".into()), }; - FCastSession::new(stream) + let stream = WebSocket::new(stream); + FCastSession::connect(stream)? } _ => return Err("Invalid connection type.".into()), }; println!("Connection established."); - + if let Some(subscriptions) = matches.value_of("subscribe") { + let subs = subscriptions.split(','); + for sub in subs { + let event = match sub.to_lowercase().as_str() { + "mediaitemstart" => v3::EventType::MediaItemStart, + "mediaitemend" => v3::EventType::MediaItemEnd, + "mediaitemchange" => v3::EventType::MediaItemChange, + "keydown" => v3::EventType::KeyDown, + "keyup" => v3::EventType::KeyUp, + _ => { + println!("Invalid event in subscriptions list: {sub}"); + continue; + } + }; + session.subscribe(event)?; + println!("Subscribed to {event:?} events"); + } + } + let mut join_handle: Option>> = None; if let Some(play_matches) = matches.subcommand_matches("play") { let file_path = play_matches.value_of("file"); + fn default_mime_type() -> String { + println!("No mime type provided via the `--mime_type` argument. Using default (application/octet-stream)"); + "application/octet-stream".to_string() + } + let mime_type = match play_matches.value_of("mime_type") { Some(s) => s.to_string(), - _ => { - if file_path.is_none() { - return Err("MIME type is required.".into()); - } - match file_path.unwrap().split('.').last() { + _ => match file_path { + Some(path) => match path.split('.').next_back() { Some("mkv") => "video/x-matroska".to_string(), Some("mov") => "video/quicktime".to_string(), Some("mp4") | Some("m4v") => "video/mp4".to_string(), Some("mpg") | Some("mpeg") => "video/mpeg".to_string(), Some("webm") => "video/webm".to_string(), - _ => return Err("MIME type is required.".into()), - } - } + _ => default_mime_type(), + }, + None => default_mime_type(), + }, }; let time = match play_matches.value_of("timestamp") { Some(s) => s.parse::().ok(), - _ => None + _ => None, }; let speed = match play_matches.value_of("speed") { Some(s) => s.parse::().ok(), - _ => None + _ => None, }; - let headers = play_matches.values_of("header") - .map(|values| values - .filter_map(|s| { - let mut parts = s.splitn(2, ':'); - if let (Some(key), Some(value)) = (parts.next(), parts.next()) { - Some((key.trim().to_string(), value.trim().to_string())) - } else { - None - } - } - ).collect::>()); + let headers = play_matches.values_of("header").map(|values| { + values + .filter_map(|s| { + let mut parts = s.splitn(2, ':'); + if let (Some(key), Some(value)) = (parts.next(), parts.next()) { + Some((key.trim().to_string(), value.trim().to_string())) + } else { + None + } + }) + .collect::>() + }); - let mut play_message = if let Some(file_path) = file_path { + #[allow(unused_assignments)] + let mut url = None; + let mut content = None; + + if let Some(file_path) = file_path { match local_ip { Some(lip) => { let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); + let r = running.clone(); ctrlc::set_handler(move || { - println!("Ctrl+C triggered, server will stop when onging request finishes..."); + println!( + "Ctrl+C triggered, server will stop when onging request finishes..." + ); r.store(false, Ordering::SeqCst); - }).expect("Error setting Ctrl-C handler"); - + }) + .expect("Error setting Ctrl-C handler"); + println!("Waiting for Ctrl+C..."); let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?; - let url = result.0; + url = Some(result.0); join_handle = Some(result.1); - - PlayMessage::new( - mime_type, - Some(url), - None, - time, - speed, - headers - ) - }, - _ => return Err("Local IP was not able to be resolved.".into()) + } + _ => return Err("Local IP was not able to be resolved.".into()), } } else { - PlayMessage::new( - mime_type, - match play_matches.value_of("url") { - Some(s) => Some(s.to_string()), - _ => None - }, - match play_matches.value_of("content") { - Some(s) => Some(s.to_string()), - _ => None - }, - time, - speed, - headers - ) - }; + url = play_matches.value_of("url").map(|s| s.to_owned()); + content = play_matches.value_of("content").map(|s| s.to_owned()); + } - if play_message.content.is_none() && play_message.url.is_none() { + if content.is_none() && url.is_none() { println!("Reading content from stdin..."); let mut buffer = String::new(); std::io::stdin().read_to_string(&mut buffer)?; - play_message.content = Some(buffer); + content = Some(buffer); } - let json = serde_json::to_string(&play_message); - println!("Sent play {:?}", json); - - session.send_message(Opcode::Play, &play_message)?; + session.send_play_message(mime_type, url, content, time, speed, headers)?; } else if let Some(seek_matches) = matches.subcommand_matches("seek") { let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") { Some(s) => s.parse::()?, - _ => return Err("Timestamp is required.".into()) + _ => return Err("Timestamp is required.".into()), }); println!("Sent seek {:?}", seek_message); session.send_message(Opcode::Seek, &seek_message)?; - } else if let Some(_) = matches.subcommand_matches("pause") { + } else if matches.subcommand_matches("pause").is_some() { println!("Sent pause"); session.send_empty(Opcode::Pause)?; - } else if let Some(_) = matches.subcommand_matches("resume") { + } else if matches.subcommand_matches("resume").is_some() { println!("Sent resume"); session.send_empty(Opcode::Resume)?; - } else if let Some(_) = matches.subcommand_matches("stop") { + } else if matches.subcommand_matches("stop").is_some() { println!("Sent stop"); session.send_empty(Opcode::Stop)?; - } else if let Some(_) = matches.subcommand_matches("listen") { + } else if matches.subcommand_matches("listen").is_some() { println!("Starter listening to events..."); let running = Arc::new(AtomicBool::new(true)); @@ -312,7 +366,8 @@ fn run() -> Result<(), Box> { ctrlc::set_handler(move || { println!("Ctrl+C triggered..."); r.store(false, Ordering::SeqCst); - }).expect("Error setting Ctrl-C handler"); + }) + .expect("Error setting Ctrl-C handler"); println!("Waiting for Ctrl+C..."); @@ -322,17 +377,23 @@ fn run() -> Result<(), Box> { } else if let Some(setvolume_matches) = matches.subcommand_matches("setvolume") { let setvolume_message = SetVolumeMessage::new(match setvolume_matches.value_of("volume") { Some(s) => s.parse::()?, - _ => return Err("Timestamp is required.".into()) + _ => return Err("Timestamp is required.".into()), }); println!("Sent setvolume {:?}", setvolume_message); session.send_message(Opcode::SetVolume, &setvolume_message)?; } else if let Some(setspeed_matches) = matches.subcommand_matches("setspeed") { let setspeed_message = SetSpeedMessage::new(match setspeed_matches.value_of("speed") { Some(s) => s.parse::()?, - _ => return Err("Speed is required.".into()) + _ => return Err("Speed is required.".into()), }); println!("Sent setspeed {:?}", setspeed_message); session.send_message(Opcode::SetSpeed, &setspeed_message)?; + } else if let Some(set_playlist_item_matches) = matches.subcommand_matches("set_playlist_item") { + let message = v3::SetPlaylistItemMessage { + item_index: set_playlist_item_matches.value_of("item_index").expect("item_index required").parse::()?, + }; + session.send_message(Opcode::SetPlaylistItem, &message)?; + println!("Sent set_playlist_item {message:?}"); } else { println!("Invalid command. Use --help for more information."); std::process::exit(1); @@ -340,7 +401,7 @@ fn run() -> Result<(), Box> { println!("Waiting on other threads..."); if let Some(v) = join_handle { - if let Err(_) = v.join() { + if v.join().is_err() { return Err("Failed to join thread.".into()); } } @@ -364,19 +425,19 @@ impl ServerState { } } -fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, running: &Arc) -> Result<(String, thread::JoinHandle>), String> { +fn host_file_and_get_url( + local_ip: &IpAddr, + file_path: &str, + mime_type: &String, + running: &Arc, +) -> Result<(String, thread::JoinHandle>), String> { let local_ip_str = if local_ip.is_ipv6() { format!("[{}]", local_ip) } else { format!("{}", local_ip) }; - let server = { - let this = Server::http(format!("{local_ip_str}:0")); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|e| format!("Failed to create server: {}", e))(e)), - } - }?; + let server = Server::http(format!("{local_ip_str}:0")) + .map_err(|err| format!("Failed to create server: {err}"))?; let url = match server.server_addr() { ListenAddr::IP(addr) => format!("http://{local_ip_str}:{}/", addr.port()), @@ -399,14 +460,9 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, } let should_break = { - let state = { - let this = state.lock(); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|e| format!("Mutex error: {}", e))(e)), - } - }?; - state.active_connections == 0 && state.last_request_time.elapsed() > Duration::from_secs(300) + let state = state.lock().unwrap(); + state.active_connections == 0 + && state.last_request_time.elapsed() > Duration::from_secs(300) }; if should_break { @@ -418,34 +474,18 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, Ok(Some(request)) => { println!("Request received."); - let mut state = { - let this = state.lock(); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|e| format!("Mutex error: {}", e))(e)), - } - }?; + let mut state = state.lock().unwrap(); state.active_connections += 1; state.last_request_time = Instant::now(); - let file = { - let this = fs::File::open(&file_path_clone); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|_| "Failed to open file.".to_string())(e)), - } - }?; + let file = fs::File::open(&file_path_clone) + .map_err(|_| "Failed to open file.".to_owned())?; - let content_type_header = { - let this = Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str()); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|_| "Failed to open file.".to_string())(e)), - } - }?; + let content_type_header = + Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str()) + .map_err(|_| "Failed to open file.".to_owned())?; - let response = Response::from_file(file) - .with_header(content_type_header); + let response = Response::from_file(file).with_header(content_type_header); if let Err(e) = request.respond(response) { println!("Failed to respond to request: {}", e); diff --git a/senders/terminal/src/models.rs b/senders/terminal/src/models.rs deleted file mode 100644 index f3a5d38..0000000 --- a/senders/terminal/src/models.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::collections::HashMap; - -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Debug)] -pub struct PlayMessage { - pub container: String, - pub url: Option, - pub content: Option, - pub time: Option, - pub speed: Option, - pub headers: Option> -} - -impl PlayMessage { - pub fn new(container: String, url: Option, content: Option, time: Option, speed: Option, headers: Option>) -> Self { - Self { container, url, content, time, speed, headers } - } -} - -#[derive(Serialize, Debug)] -pub struct SeekMessage { - pub time: f64, -} - -impl SeekMessage { - pub fn new(time: f64) -> Self { - Self { time } - } -} - -#[derive(Deserialize, Debug)] -pub struct PlaybackUpdateMessage { - #[serde(rename = "generationTime")] - pub generation_time: u64, - pub time: f64, - pub duration: f64, - pub speed: f64, - pub state: u8 //0 = None, 1 = Playing, 2 = Paused -} - -#[derive(Deserialize, Debug)] -pub struct VolumeUpdateMessage { - #[serde(rename = "generationTime")] - pub generation_time: u64, - pub volume: f64 //(0-1) -} - -#[derive(Serialize, Debug)] -pub struct SetVolumeMessage { - pub volume: f64, -} - -impl SetVolumeMessage { - pub fn new(volume: f64) -> Self { - Self { volume } - } -} - -#[derive(Serialize, Debug)] -pub struct SetSpeedMessage { - pub speed: f64, -} - -impl SetSpeedMessage { - pub fn new(speed: f64) -> Self { - Self { speed } - } -} - -#[derive(Deserialize, Debug)] -pub struct PlaybackErrorMessage { - pub message: String, -} - -#[derive(Deserialize, Debug)] -pub struct VersionMessage { - pub version: u64, -} \ No newline at end of file diff --git a/senders/terminal/src/models/mod.rs b/senders/terminal/src/models/mod.rs new file mode 100644 index 0000000..c563b7c --- /dev/null +++ b/senders/terminal/src/models/mod.rs @@ -0,0 +1,54 @@ +use serde::{Deserialize, Serialize}; + +pub mod v2; +pub mod v3; + +#[derive(Deserialize, Debug)] +pub struct PlaybackErrorMessage { + pub message: String, +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct VersionMessage { + pub version: u64, +} + +#[derive(Serialize, Debug)] +pub struct SetSpeedMessage { + pub speed: f64, +} + +impl SetSpeedMessage { + pub fn new(speed: f64) -> Self { + Self { speed } + } +} + +#[derive(Deserialize, Debug)] +pub struct VolumeUpdateMessage { + #[serde(rename = "generationTime")] + pub generation_time: u64, + pub volume: f64, //(0-1) +} + +#[derive(Serialize, Debug)] +pub struct SetVolumeMessage { + pub volume: f64, +} + +impl SetVolumeMessage { + pub fn new(volume: f64) -> Self { + Self { volume } + } +} + +#[derive(Serialize, Debug)] +pub struct SeekMessage { + pub time: f64, +} + +impl SeekMessage { + pub fn new(time: f64) -> Self { + Self { time } + } +} diff --git a/senders/terminal/src/models/v2.rs b/senders/terminal/src/models/v2.rs new file mode 100644 index 0000000..73843e9 --- /dev/null +++ b/senders/terminal/src/models/v2.rs @@ -0,0 +1,43 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Debug)] +pub struct PlayMessage { + pub container: String, + pub url: Option, + pub content: Option, + pub time: Option, + pub speed: Option, + pub headers: Option>, +} + +impl PlayMessage { + pub fn new( + container: String, + url: Option, + content: Option, + time: Option, + speed: Option, + headers: Option>, + ) -> Self { + Self { + container, + url, + content, + time, + speed, + headers, + } + } +} + +#[derive(Deserialize, Debug)] +pub struct PlaybackUpdateMessage { + #[serde(rename = "generationTime")] + pub generation_time: u64, + pub time: f64, + pub duration: f64, + pub speed: f64, + pub state: u8, //0 = None, 1 = Playing, 2 = Paused +} diff --git a/senders/terminal/src/models/v3.rs b/senders/terminal/src/models/v3.rs new file mode 100644 index 0000000..c2432a6 --- /dev/null +++ b/senders/terminal/src/models/v3.rs @@ -0,0 +1,723 @@ +use std::collections::HashMap; + +use serde::{de, ser, Deserialize, Serialize}; +use serde_json::{json, Value}; +use serde_repr::{Deserialize_repr, Serialize_repr}; + +macro_rules! get_from_map { + ($map:expr, $key:expr) => { + $map.get($key).ok_or(de::Error::missing_field($key)) + }; +} + +#[derive(Debug, PartialEq, Clone)] +pub enum MetadataObject { + Generic { + title: Option, + thumbnail_url: Option, + custom: Value, + }, +} + +impl Serialize for MetadataObject { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + MetadataObject::Generic { + title, + thumbnail_url, + custom, + } => { + let mut map = serde_json::Map::new(); + map.insert("type".to_owned(), json!(0u64)); + map.insert( + "title".to_owned(), + match title { + Some(t) => Value::String(t.to_owned()), + None => Value::Null, + }, + ); + map.insert( + "thumbnailUrl".to_owned(), + match thumbnail_url { + Some(t) => Value::String(t.to_owned()), + None => Value::Null, + }, + ); + map.insert("custom".to_owned(), custom.clone()); + map.serialize(serializer) + } + } + } +} + +impl<'de> Deserialize<'de> for MetadataObject { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let mut map = serde_json::Map::deserialize(deserializer)?; + + let type_ = map + .remove("type") + .ok_or(de::Error::missing_field("type"))? + .as_u64() + .ok_or(de::Error::custom("`type` is not an integer"))?; + let rest = Value::Object(map); + + match type_ { + 0 => { + let title = match rest.get("title") { + Some(t) => Some( + t.as_str() + .ok_or(de::Error::custom("`title` is not a string"))? + .to_owned(), + ), + None => None, + }; + let thumbnail_url = match rest.get("thumbnailUrl") { + Some(t) => Some( + t.as_str() + .ok_or(de::Error::custom("`thumbnailUrl` is not a string"))? + .to_owned(), + ), + None => None, + }; + Ok(Self::Generic { + title, + thumbnail_url, + custom: rest + .get("custom") + .ok_or(de::Error::missing_field("custom"))? + .clone(), + }) + } + _ => Err(de::Error::custom(format!("Unknown metadata type {type_}"))), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct PlayMessage { + /// The MIME type (video/mp4) + pub container: String, + // The URL to load (optional) + pub url: Option, + // The content to load (i.e. a DASH manifest, json content, optional) + pub content: Option, + // The time to start playing in seconds + pub time: Option, + // The desired volume (0-1) + pub volume: Option, + // The factor to multiply playback speed by (defaults to 1.0) + pub speed: Option, + // HTTP request headers to add to the play request Map + pub headers: Option>, + pub metadata: Option, +} + +#[derive(Deserialize_repr, Serialize_repr, Debug)] +#[repr(u8)] +pub enum ContentType { + Playlist = 0, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct MediaItem { + /// The MIME type (video/mp4) + pub container: String, + /// The URL to load (optional) + pub url: Option, + /// The content to load (i.e. a DASH manifest, json content, optional) + pub content: Option, + /// The time to start playing in seconds + pub time: Option, + /// The desired volume (0-1) + pub volume: Option, + /// The factor to multiply playback speed by (defaults to 1.0) + pub speed: Option, + /// Indicates if the receiver should preload the media item + pub cache: Option, + /// Indicates how long the item content is presented on screen in seconds + #[serde(rename = "showDuration")] + pub show_duration: Option, + /// HTTP request headers to add to the play request Map + pub headers: Option>, + pub metadata: Option, +} + +#[derive(Serialize, Debug)] +pub struct PlaylistContent { + #[serde(rename = "type")] + variant: ContentType, + pub items: Vec, + /// Start position of the first item to play from the playlist + pub offset: Option, // int or float? + /// The desired volume (0-1) + pub volume: Option, + /// The factor to multiply playback speed by (defaults to 1.0) + pub speed: Option, + /// Count of media items should be pre-loaded forward from the current view index + #[serde(rename = "forwardCache")] + pub forward_cache: Option, + /// Count of media items should be pre-loaded backward from the current view index + #[serde(rename = "backwardCache")] + pub backward_cache: Option, + pub metadata: Option, +} + +#[derive(Serialize_repr, Deserialize_repr, Debug)] +#[repr(u8)] +pub enum PlaybackState { + Idle = 0, + Playing = 1, + Paused = 2, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct PlaybackUpdateMessage { + // The time the packet was generated (unix time milliseconds) + #[serde(rename = "generationTime")] + pub generation_time: u64, + // The playback state + pub state: PlaybackState, + // The current time playing in seconds + pub time: Option, + // The duration in seconds + pub duration: Option, + // The playback speed factor + pub speed: Option, + // The playlist item index currently being played on receiver + #[serde(rename = "itemIndex")] + pub item_index: Option, +} + +#[derive(Serialize, Debug)] +pub struct InitialSenderMessage { + #[serde(rename = "displayName")] + pub display_name: Option, + #[serde(rename = "appName")] + pub app_name: Option, + #[serde(rename = "appVersion")] + pub app_version: Option, +} + +#[derive(Deserialize, Debug)] +pub struct InitialReceiverMessage { + #[serde(rename = "displayName")] + pub display_name: Option, + #[serde(rename = "appName")] + pub app_name: Option, + #[serde(rename = "appVersion")] + pub app_version: Option, + #[serde(rename = "playData")] + pub play_data: Option, +} + +#[derive(Deserialize, Debug)] +pub struct PlayUpdateMessage { + #[serde(rename = "generationTime")] + pub generation_time: Option, + #[serde(rename = "playData")] + pub play_data: Option, +} + +#[derive(Serialize, Debug)] +pub struct SetPlaylistItemMessage { + #[serde(rename = "itemIndex")] + pub item_index: u64, +} + +#[derive(Deserialize, Debug)] +pub enum KeyNames { + ArrowLeft, + ArrowRight, + ArrowUp, + ArrowDown, + Enter, +} + +impl KeyNames { + pub fn all() -> Vec { + vec![ + "ArrowLeft".to_owned(), + "ArrowRight".to_owned(), + "ArrowUp".to_owned(), + "ArrowDown".to_owned(), + "Enter".to_owned(), + ] + } +} + +#[derive(Debug, PartialEq)] +pub enum EventSubscribeObject { + MediaItemStart, + MediaItemEnd, + MediaItemChanged, + KeyDown { keys: Vec }, + KeyUp { keys: Vec }, +} + +impl Serialize for EventSubscribeObject { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serde_json::Map::new(); + let type_val: u64 = match self { + EventSubscribeObject::MediaItemStart => 0, + EventSubscribeObject::MediaItemEnd => 1, + EventSubscribeObject::MediaItemChanged => 2, + EventSubscribeObject::KeyDown { .. } => 3, + EventSubscribeObject::KeyUp { .. } => 4, + }; + + map.insert("type".to_owned(), json!(type_val)); + + let keys = match self { + EventSubscribeObject::KeyDown { keys } => Some(keys), + EventSubscribeObject::KeyUp { keys } => Some(keys), + _ => None, + }; + if let Some(keys) = keys { + map.insert("keys".to_owned(), json!(keys)); + } + + map.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for EventSubscribeObject { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let mut map = serde_json::Map::deserialize(deserializer)?; + let type_ = map + .remove("type") + .ok_or(de::Error::missing_field("type"))? + .as_u64() + .ok_or(de::Error::custom("`type` is not an integer"))?; + let rest = Value::Object(map); + + match type_ { + 0 => Ok(Self::MediaItemStart), + 1 => Ok(Self::MediaItemEnd), + 2 => Ok(Self::MediaItemChanged), + 3 | 4 => { + let keys = get_from_map!(rest, "keys")? + .as_array() + .ok_or(de::Error::custom("`type` is not an array"))? + .iter() + .map(|v| v.as_str().map(|s| s.to_owned())) + .collect::>>() + .ok_or(de::Error::custom("`type` is not an array of strings"))?; + if type_ == 3 { + Ok(Self::KeyDown { keys }) + } else { + Ok(Self::KeyUp { keys }) + } + } + _ => Err(de::Error::custom(format!("Unknown event type {type_}"))), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SubscribeEventMessage { + pub event: EventSubscribeObject, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct UnsubscribeEventMessage { + pub event: EventSubscribeObject, +} + +#[derive(Debug, PartialEq, Copy, Clone)] +#[repr(u8)] +pub enum EventType { + MediaItemStart = 0, + MediaItemEnd = 1, + MediaItemChange = 2, + KeyDown = 3, + KeyUp = 4, +} + +#[derive(Debug, PartialEq)] +#[allow(clippy::large_enum_variant)] +pub enum EventObject { + MediaItem { + variant: EventType, + item: MediaItem, + }, + Key { + variant: EventType, + key: String, + repeat: bool, + handled: bool, + }, +} + +impl Serialize for EventObject { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serde_json::Map::new(); + + match self { + EventObject::MediaItem { variant, item } => { + map.insert("type".to_owned(), json!(*variant as u8)); + map.insert( + "item".to_owned(), + serde_json::to_value(item).map_err(ser::Error::custom)?, + ); + } + EventObject::Key { + variant, + key, + repeat, + handled, + } => { + map.insert("type".to_owned(), json!(*variant as u8)); + map.insert( + "key".to_owned(), + serde_json::to_value(key).map_err(ser::Error::custom)?, + ); + map.insert( + "repeat".to_owned(), + serde_json::to_value(repeat).map_err(ser::Error::custom)?, + ); + map.insert( + "handled".to_owned(), + serde_json::to_value(handled).map_err(ser::Error::custom)?, + ); + } + } + + map.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for EventObject { + fn deserialize(deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + let mut map = serde_json::Map::deserialize(deserializer)?; + let type_ = map + .remove("type") + .ok_or(de::Error::missing_field("type"))? + .as_u64() + .ok_or(de::Error::custom("`type` is not an integer"))?; + let rest = Value::Object(map); + + match type_ { + #[allow(clippy::manual_range_patterns)] + 0 | 1 | 2 => { + let variant = match type_ { + 0 => EventType::MediaItemStart, + 1 => EventType::MediaItemEnd, + _ => EventType::MediaItemChange, + }; + let item = get_from_map!(rest, "item")?; + Ok(Self::MediaItem { + variant, + item: MediaItem::deserialize(item).map_err(de::Error::custom)?, + }) + } + 3 | 4 => { + let variant = if type_ == 3 { + EventType::KeyDown + } else { + EventType::KeyUp + }; + Ok(Self::Key { + variant, + key: get_from_map!(rest, "key")? + .as_str() + .ok_or(de::Error::custom("`key` is not a string"))? + .to_owned(), + repeat: get_from_map!(rest, "repeat")? + .as_bool() + .ok_or(de::Error::custom("`repeat` is not a bool"))?, + handled: get_from_map!(rest, "handled")? + .as_bool() + .ok_or(de::Error::custom("`handled` is not a bool"))?, + }) + } + _ => Err(de::Error::custom(format!("Unknown event type {type_}"))), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct EventMessage { + #[serde(rename = "generationTime")] + pub generation_time: u64, + event: EventObject, +} + +#[cfg(test)] +mod tests { + use super::*; + + macro_rules! s { + ($s:expr) => { + ($s).to_string() + }; + } + + #[test] + fn serialize_metadata_object() { + assert_eq!( + &serde_json::to_string(&MetadataObject::Generic { + title: Some(s!("abc")), + thumbnail_url: Some(s!("def")), + custom: serde_json::Value::Null, + }) + .unwrap(), + r#"{"custom":null,"thumbnailUrl":"def","title":"abc","type":0}"# + ); + assert_eq!( + &serde_json::to_string(&MetadataObject::Generic { + title: None, + thumbnail_url: None, + custom: serde_json::Value::Null, + }) + .unwrap(), + r#"{"custom":null,"thumbnailUrl":null,"title":null,"type":0}"# + ); + } + + #[test] + fn deserialize_metadata_object() { + assert_eq!( + serde_json::from_str::( + r#"{"type":0,"title":"abc","thumbnailUrl":"def","custom":null}"# + ) + .unwrap(), + MetadataObject::Generic { + title: Some(s!("abc")), + thumbnail_url: Some(s!("def")), + custom: serde_json::Value::Null, + } + ); + assert_eq!( + serde_json::from_str::(r#"{"type":0,"custom":null}"#).unwrap(), + MetadataObject::Generic { + title: None, + thumbnail_url: None, + custom: serde_json::Value::Null, + } + ); + assert!(serde_json::from_str::(r#"{"type":1"#).is_err()); + } + + #[test] + fn serialize_event_sub_obj() { + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::MediaItemStart).unwrap(), + r#"{"type":0}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::MediaItemEnd).unwrap(), + r#"{"type":1}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::MediaItemChanged).unwrap(), + r#"{"type":2}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(), + r#"{"keys":[],"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(), + r#"{"keys":[],"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyUp { + keys: vec![s!("abc"), s!("def")] + }) + .unwrap(), + r#"{"keys":["abc","def"],"type":4}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { + keys: vec![s!("abc"), s!("def")] + }) + .unwrap(), + r#"{"keys":["abc","def"],"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { + keys: vec![s!("\"\"")] + }) + .unwrap(), + r#"{"keys":["\"\""],"type":3}"# + ); + } + + #[test] + fn deserialize_event_sub_obj() { + assert_eq!( + serde_json::from_str::(r#"{"type":0}"#).unwrap(), + EventSubscribeObject::MediaItemStart + ); + assert_eq!( + serde_json::from_str::(r#"{"type":1}"#).unwrap(), + EventSubscribeObject::MediaItemEnd + ); + assert_eq!( + serde_json::from_str::(r#"{"type":2}"#).unwrap(), + EventSubscribeObject::MediaItemChanged + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":[],"type":3}"#).unwrap(), + EventSubscribeObject::KeyDown { keys: vec![] } + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":[],"type":4}"#).unwrap(), + EventSubscribeObject::KeyUp { keys: vec![] } + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":["abc","def"],"type":3}"#) + .unwrap(), + EventSubscribeObject::KeyDown { + keys: vec![s!("abc"), s!("def")] + } + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":["abc","def"],"type":4}"#) + .unwrap(), + EventSubscribeObject::KeyUp { + keys: vec![s!("abc"), s!("def")] + } + ); + assert!(serde_json::from_str::(r#""type":5}"#).is_err()); + } + + const EMPTY_TEST_MEDIA_ITEM: MediaItem = MediaItem { + container: String::new(), + url: None, + content: None, + time: None, + volume: None, + speed: None, + cache: None, + show_duration: None, + headers: None, + metadata: None, + }; + const TEST_MEDIA_ITEM_JSON: &str = r#"{"cache":null,"container":"","content":null,"headers":null,"metadata":null,"showDuration":null,"speed":null,"time":null,"url":null,"volume":null}"#; + + #[test] + fn serialize_event_obj() { + assert_eq!( + serde_json::to_string(&EventObject::MediaItem { + variant: EventType::MediaItemStart, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + }) + .unwrap(), + format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"#), + ); + assert_eq!( + serde_json::to_string(&EventObject::MediaItem { + variant: EventType::MediaItemEnd, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + }) + .unwrap(), + format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"#), + ); + assert_eq!( + serde_json::to_string(&EventObject::MediaItem { + variant: EventType::MediaItemChange, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + }) + .unwrap(), + format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"#), + ); + assert_eq!( + &serde_json::to_string(&EventObject::Key { + variant: EventType::KeyDown, + key: s!(""), + repeat: false, + handled: false, + }) + .unwrap(), + r#"{"handled":false,"key":"","repeat":false,"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventObject::Key { + variant: EventType::KeyUp, + key: s!(""), + repeat: false, + handled: false, + }) + .unwrap(), + r#"{"handled":false,"key":"","repeat":false,"type":4}"# + ); + } + + #[test] + fn deserialize_event_obj() { + assert_eq!( + serde_json::from_str::(&format!( + r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"# + )) + .unwrap(), + EventObject::MediaItem { + variant: EventType::MediaItemStart, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + } + ); + assert_eq!( + serde_json::from_str::(&format!( + r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"# + )) + .unwrap(), + EventObject::MediaItem { + variant: EventType::MediaItemEnd, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + } + ); + assert_eq!( + serde_json::from_str::(&format!( + r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"# + )) + .unwrap(), + EventObject::MediaItem { + variant: EventType::MediaItemChange, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + } + ); + assert_eq!( + serde_json::from_str::( + r#"{"handled":false,"key":"","repeat":false,"type":3}"# + ) + .unwrap(), + EventObject::Key { + variant: EventType::KeyDown, + key: s!(""), + repeat: false, + handled: false, + } + ); + assert_eq!( + serde_json::from_str::( + r#"{"handled":false,"key":"","repeat":false,"type":4}"# + ) + .unwrap(), + EventObject::Key { + variant: EventType::KeyUp, + key: s!(""), + repeat: false, + handled: false, + } + ); + assert!(serde_json::from_str::(r#"{"type":5}"#).is_err()); + } +} diff --git a/senders/terminal/src/transport.rs b/senders/terminal/src/transport.rs index 655e513..e62d7f6 100644 --- a/senders/terminal/src/transport.rs +++ b/senders/terminal/src/transport.rs @@ -1,12 +1,14 @@ +use std::collections::VecDeque; use std::io::{Read, Write}; use std::net::TcpStream; +use tungstenite::protocol::WebSocket as TWebSocket; use tungstenite::Message; -use tungstenite::protocol::WebSocket; pub trait Transport { fn transport_read(&mut self, buf: &mut [u8]) -> Result; fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error>; fn transport_shutdown(&mut self) -> Result<(), std::io::Error>; + fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error>; } impl Transport for TcpStream { @@ -21,36 +23,156 @@ impl Transport for TcpStream { fn transport_shutdown(&mut self) -> Result<(), std::io::Error> { self.shutdown(std::net::Shutdown::Both) } + + fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> { + self.read_exact(buf) + } } -impl Transport for WebSocket { - fn transport_read(&mut self, buf: &mut [u8]) -> Result { - match self.read() { - Ok(Message::Binary(data)) => { - let len = std::cmp::min(buf.len(), data.len()); - buf[..len].copy_from_slice(&data[..len]); - Ok(len) - }, - _ => Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid message type")) +pub struct WebSocket +where + T: Read + Write, +{ + inner: TWebSocket, + buffer: VecDeque, +} + +impl WebSocket +where + T: Read + Write, +{ + pub fn new(web_socket: TWebSocket) -> Self { + Self { + inner: web_socket, + buffer: VecDeque::new(), } } + pub fn read_buffered(&mut self, buf: &mut [u8]) -> Result { + if !self.buffer.is_empty() { + let bytes_to_read = buf.len().min(self.buffer.len()); + assert!(buf.len() >= bytes_to_read); + assert!(self.buffer.len() >= bytes_to_read); + for b in buf.iter_mut().take(bytes_to_read) { + *b = self.buffer.pop_front().unwrap(); // Safe unwrap as bounds was checked previously + } + } else { + match self.inner.read() { + Ok(Message::Binary(data)) => { + let bytes_to_read = buf.len().min(data.len()); + buf.copy_from_slice(&data[..bytes_to_read]); + for rest in data[bytes_to_read..].iter() { + self.buffer.push_back(*rest); + } + } + _ => return Err(std::io::Error::other("Invalid message type")), + } + } + + Ok(buf.len()) + } +} + +impl Transport for WebSocket +where + T: Read + Write, +{ + fn transport_read(&mut self, buf: &mut [u8]) -> Result { + self.read_buffered(buf) + } + fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> { - self.write(Message::Binary(buf.to_vec())) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - self.flush().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + self.inner + .write(Message::Binary(buf.to_vec())) + .map_err(std::io::Error::other)?; + self.inner.flush().map_err(std::io::Error::other) } fn transport_shutdown(&mut self) -> Result<(), std::io::Error> { - self.close(None).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + self.inner.close(None).map_err(std::io::Error::other)?; loop { - match self.read() { + match self.inner.read() { Ok(_) => continue, Err(tungstenite::Error::ConnectionClosed) => break, - Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + Err(e) => return Err(std::io::Error::other(e)), } } Ok(()) } -} \ No newline at end of file + + fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> { + let mut total_read = 0; + while total_read < buf.len() { + total_read += self.read_buffered(&mut buf[total_read..])?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::net::TcpListener; + + use super::*; + + #[test] + fn websocket_read_buffered() { + let jh = std::thread::spawn(|| { + let server = TcpListener::bind("127.0.0.1:51232").unwrap(); + let stream = server.incoming().next().unwrap().unwrap(); + let mut websocket = tungstenite::accept(stream).unwrap(); + websocket + .send(tungstenite::Message::binary([1, 2, 3, 4])) + .unwrap(); + websocket + .send(tungstenite::Message::binary([5, 6, 7, 8])) + .unwrap(); + }); + + let (websocket, _) = tungstenite::connect("ws://127.0.0.1:51232").unwrap(); + let mut websocket = WebSocket::new(websocket); + + let mut buf = [0u8; 2]; + assert_eq!(websocket.read_buffered(&mut buf).unwrap(), 2); + assert_eq!(buf, [1, 2]); + assert_eq!(websocket.read_buffered(&mut buf).unwrap(), 2); + assert_eq!(buf, [3, 4]); + + let mut buf = [0u8; 4]; + assert_eq!(websocket.read_buffered(&mut buf).unwrap(), 4); + assert_eq!(buf, [5, 6, 7, 8]); + + let _ = websocket.transport_shutdown(); + + jh.join().unwrap(); + } + + #[test] + fn websocket_read_exact() { + let jh = std::thread::spawn(|| { + let server = TcpListener::bind("127.0.0.1:51234").unwrap(); + let stream = server.incoming().next().unwrap().unwrap(); + let mut websocket = tungstenite::accept(stream).unwrap(); + websocket + .send(tungstenite::Message::binary([1, 2, 3])) + .unwrap(); + }); + + let (websocket, _) = tungstenite::connect("ws://127.0.0.1:51234").unwrap(); + let mut websocket = WebSocket::new(websocket); + + fn read_exact(stream: &mut T) { + let mut buf = [0u8; 3]; + stream.transport_read_exact(&mut buf).unwrap(); + assert_eq!(buf, [1, 2, 3]); + } + + read_exact(&mut websocket); + + let _ = websocket.transport_shutdown(); + + jh.join().unwrap(); + } +} diff --git a/senders/terminal/video_playlist_example.json b/senders/terminal/video_playlist_example.json new file mode 100644 index 0000000..b0d90b7 --- /dev/null +++ b/senders/terminal/video_playlist_example.json @@ -0,0 +1,14 @@ +{ + "contentType": 0, + "items": [ + { + "container": "video/webm", + "url": "https://upload.wikimedia.org/wikipedia/commons/7/70/EVEREST.webm" + }, + { + "container": "image/png", + "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/6/62/NTV7_Testpattern.png/640px-NTV7_Testpattern.png" + } + ], + "offset": 0 +}