From 6529d91eb23d82db67a200e232a7d6a34ce53064 Mon Sep 17 00:00:00 2001 From: Marcus Hanestad Date: Tue, 3 Jun 2025 16:39:09 +0200 Subject: [PATCH] rs-terminal: init protocol v3 support --- senders/terminal/Cargo.toml | 3 +- senders/terminal/src/fcastsession.rs | 252 +++++----- senders/terminal/src/file_server.rs | 36 -- senders/terminal/src/lib.rs | 3 + senders/terminal/src/main.rs | 423 +++++++++-------- senders/terminal/src/models.rs | 79 ---- senders/terminal/src/models/mod.rs | 54 +++ senders/terminal/src/models/v2.rs | 43 ++ senders/terminal/src/models/v3.rs | 671 +++++++++++++++++++++++++++ senders/terminal/src/transport.rs | 63 ++- 10 files changed, 1165 insertions(+), 462 deletions(-) delete mode 100644 senders/terminal/src/file_server.rs create mode 100644 senders/terminal/src/lib.rs delete mode 100644 senders/terminal/src/models.rs create mode 100644 senders/terminal/src/models/mod.rs create mode 100644 senders/terminal/src/models/v2.rs create mode 100644 senders/terminal/src/models/v3.rs diff --git a/senders/terminal/Cargo.toml b/senders/terminal/Cargo.toml index c134a73..b170021 100644 --- a/senders/terminal/Cargo.toml +++ b/senders/terminal/Cargo.toml @@ -12,4 +12,5 @@ serde_json = "1.0" ctrlc = "3.1.9" tungstenite = { version = "0.21.0" } url = "2.5.0" -tiny_http = "0.12.0" \ No newline at end of file +tiny_http = "0.12.0" +serde_repr = "0.1.20" \ No newline at end of file diff --git a/senders/terminal/src/fcastsession.rs b/senders/terminal/src/fcastsession.rs index 366ab0c..40a9c82 100644 --- a/senders/terminal/src/fcastsession.rs +++ b/senders/terminal/src/fcastsession.rs @@ -1,13 +1,18 @@ -use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; -use crate::{models::{PlaybackUpdateMessage, VolumeUpdateMessage, PlaybackErrorMessage, VersionMessage}, transport::Transport}; +use crate::{ + models::v2::PlaybackUpdateMessage, + models::{PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage}, + transport::Transport, +}; use serde::Serialize; #[derive(Debug)] enum SessionState { Idle = 0, - WaitingForLength, - WaitingForData, Disconnected, } @@ -26,7 +31,14 @@ pub enum Opcode { SetSpeed = 10, Version = 11, Ping = 12, - Pong = 13 + Pong = 13, + + Initial = 14, + PlayUpdate = 15, + SetPlaylistItem = 16, + SubscribeEvent = 17, + UnsubscribeEvent = 18, + Event = 19, } impl Opcode { @@ -46,6 +58,13 @@ impl Opcode { 11 => Opcode::Version, 12 => Opcode::Ping, 13 => Opcode::Pong, + + 14 => Opcode::Initial, + 15 => Opcode::PlayUpdate, + 16 => Opcode::SetPlaylistItem, + 17 => Opcode::SubscribeEvent, + 18 => Opcode::UnsubscribeEvent, + 19 => Opcode::Event, _ => panic!("Unknown value: {}", value), } } @@ -56,26 +75,85 @@ const MAXIMUM_PACKET_LENGTH: usize = 32000; pub struct FCastSession<'a> { buffer: Vec, - bytes_read: usize, - packet_length: usize, stream: Box, - state: SessionState + state: SessionState, } impl<'a> FCastSession<'a> { pub fn new(stream: T) -> Self { - return FCastSession { + Self { buffer: vec![0; MAXIMUM_PACKET_LENGTH], - bytes_read: 0, - packet_length: 0, stream: Box::new(stream), - state: SessionState::Idle + state: SessionState::Idle, } } -} -impl FCastSession<'_> { - pub fn send_message(&mut self, opcode: Opcode, message: T) -> Result<(), Box> { + pub fn connect(stream: T) -> Result> { + let mut session = Self::new(stream); + + session.send_message( + Opcode::Version, + crate::models::VersionMessage { version: 3 }, + )?; + + let (opcode, body) = session.read_packet()?; + + if opcode != Opcode::Version { + return Err(format!("Expected Opcode::Version, got {opcode:?}").into()); + } + + let msg: VersionMessage = + serde_json::from_str(&body.ok_or("Version messages required body".to_owned())?)?; + + if msg.version == 3 { + todo!("Send/recv initial message"); + } + + Ok(session) + } + + fn read_packet(&mut self) -> Result<(Opcode, Option), Box> { + let mut header_buf = [0u8; 5]; + self.stream.transport_read_exact(&mut header_buf)?; + + let opcode = Opcode::from_u8(header_buf[4]); + let body_length = + u32::from_le_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]]) + as usize + - 1; + + if body_length > MAXIMUM_PACKET_LENGTH { + println!( + "Maximum packet length is 32kB, killing stream: {}", + body_length, + ); + + self.stream.transport_shutdown()?; + self.state = SessionState::Disconnected; + return Err(format!( + "Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", + body_length, + ) + .into()); + } + + self.stream + .transport_read_exact(&mut self.buffer[0..body_length])?; + + let body_json = if body_length > 0 { + Some(String::from_utf8(self.buffer[0..body_length].to_vec())?) + } else { + None + }; + + Ok((opcode, body_json)) + } + + pub fn send_message( + &mut self, + opcode: Opcode, + message: T, + ) -> Result<(), Box> { let json = serde_json::to_string(&message)?; let data = json.as_bytes(); let size = 1 + data.len(); @@ -83,9 +161,16 @@ impl FCastSession<'_> { let mut header = vec![0u8; header_size]; header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes()); header[LENGTH_BYTES] = opcode as u8; - + let packet = [header, data.to_vec()].concat(); - println!("Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).", packet.len(), opcode, header_size, data.len(), json); + println!( + "Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).", + packet.len(), + opcode, + header_size, + data.len(), + json + ); self.stream.transport_write(&packet)?; Ok(()) } @@ -97,126 +182,39 @@ impl FCastSession<'_> { let mut header = vec![0u8; LENGTH_BYTES + 1]; header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes()); header[LENGTH_BYTES] = opcode as u8; - + let packet = [header, data.to_vec()].concat(); self.stream.transport_write(&packet)?; Ok(()) } - pub fn receive_loop(&mut self, running: &Arc) -> Result<(), Box> { + pub fn receive_loop( + &mut self, + running: &Arc, + ) -> Result<(), Box> { println!("Start receiving."); - self.state = SessionState::WaitingForLength; - - let mut buffer = [0u8; 1024]; while running.load(Ordering::SeqCst) { - let bytes_read = self.stream.transport_read(&mut buffer)?; - self.process_bytes(&buffer[..bytes_read])?; - } - - self.state = SessionState::Idle; - Ok(()) - } - - fn process_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box> { - if received_bytes.is_empty() { - return Ok(()); - } - - println!("{} bytes received", received_bytes.len()); - - match self.state { - SessionState::WaitingForLength => self.handle_length_bytes(received_bytes)?, - SessionState::WaitingForData => self.handle_packet_bytes(received_bytes)?, - _ => println!("Data received is unhandled in current session state {:?}", self.state), + let (opcode, body) = self.read_packet()?; + self.handle_packet(opcode, body)?; } Ok(()) } - - fn handle_length_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box> { - let bytes_to_read = std::cmp::min(LENGTH_BYTES, received_bytes.len()); - let bytes_remaining = received_bytes.len() - bytes_to_read; - self.buffer[self.bytes_read..self.bytes_read + bytes_to_read] - .copy_from_slice(&received_bytes[..bytes_to_read]); - self.bytes_read += bytes_to_read; - - println!("handleLengthBytes: Read {} bytes from packet", bytes_to_read); - - if self.bytes_read >= LENGTH_BYTES { - self.state = SessionState::WaitingForData; - self.packet_length = u32::from_le_bytes(self.buffer[..LENGTH_BYTES].try_into()?) as usize; - self.bytes_read = 0; - - println!("Packet length header received from: {}", self.packet_length); - - if self.packet_length > MAXIMUM_PACKET_LENGTH { - println!("Maximum packet length is 32kB, killing stream: {}", self.packet_length); - - self.stream.transport_shutdown()?; - self.state = SessionState::Disconnected; - return Err(format!("Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", self.packet_length).into()); - } - - if bytes_remaining > 0 { - println!("{} remaining bytes pushed to handlePacketBytes", bytes_remaining); - - self.handle_packet_bytes(&received_bytes[bytes_to_read..])?; - } - } - - Ok(()) - } - - fn handle_packet_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box> { - let bytes_to_read = std::cmp::min(self.packet_length, received_bytes.len()); - let bytes_remaining = received_bytes.len() - bytes_to_read; - self.buffer[self.bytes_read..self.bytes_read + bytes_to_read] - .copy_from_slice(&received_bytes[..bytes_to_read]); - self.bytes_read += bytes_to_read; - - println!("handlePacketBytes: Read {} bytes from packet", bytes_to_read); - - if self.bytes_read >= self.packet_length { - println!("Packet finished receiving of {} bytes.", self.packet_length); - self.handle_next_packet()?; - - self.state = SessionState::WaitingForLength; - self.packet_length = 0; - self.bytes_read = 0; - - if bytes_remaining > 0 { - println!("{} remaining bytes pushed to handleLengthBytes", bytes_remaining); - self.handle_length_bytes(&received_bytes[bytes_to_read..])?; - } - } - - Ok(()) - } - - fn handle_next_packet(&mut self) -> Result<(), Box> { - println!("Processing packet of {} bytes", self.bytes_read); - - let opcode = Opcode::from_u8(self.buffer[0]); - let packet_length = self.packet_length; - let body = if packet_length > 1 { - Some(std::str::from_utf8(&self.buffer[1..packet_length])?.to_string()) - } else { - None - }; - - println!("Received body: {:?}", body); - self.handle_packet(opcode, body) - } - - fn handle_packet(&mut self, opcode: Opcode, body: Option) -> Result<(), Box> { + fn handle_packet( + &mut self, + opcode: Opcode, + body: Option, + ) -> Result<(), Box> { println!("Received message with opcode {:?}.", opcode); match opcode { Opcode::PlaybackUpdate => { if let Some(body_str) = body { - if let Ok(playback_update_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(playback_update_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received playback update {:?}", playback_update_msg); } else { println!("Received playback update with malformed body."); @@ -227,7 +225,9 @@ impl FCastSession<'_> { } Opcode::VolumeUpdate => { if let Some(body_str) = body { - if let Ok(volume_update_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(volume_update_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received volume update {:?}", volume_update_msg); } else { println!("Received volume update with malformed body."); @@ -238,7 +238,9 @@ impl FCastSession<'_> { } Opcode::PlaybackError => { if let Some(body_str) = body { - if let Ok(playback_error_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(playback_error_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received playback error {:?}", playback_error_msg); } else { println!("Received playback error with malformed body."); @@ -249,7 +251,9 @@ impl FCastSession<'_> { } Opcode::Version => { if let Some(body_str) = body { - if let Ok(version_msg) = serde_json::from_str::(body_str.as_str()) { + if let Ok(version_msg) = + serde_json::from_str::(body_str.as_str()) + { println!("Received version {:?}", version_msg); } else { println!("Received version with malformed body."); @@ -272,6 +276,6 @@ impl FCastSession<'_> { } pub fn shutdown(&mut self) -> Result<(), std::io::Error> { - return self.stream.transport_shutdown(); + self.stream.transport_shutdown() } -} \ No newline at end of file +} diff --git a/senders/terminal/src/file_server.rs b/senders/terminal/src/file_server.rs deleted file mode 100644 index 2c9547b..0000000 --- a/senders/terminal/src/file_server.rs +++ /dev/null @@ -1,36 +0,0 @@ -struct FileServer { - base_url: String, - base_path: String, -} - -impl FileServer { - fn new(base_url: String, base_path: String) -> Self { - FileServer { base_url, base_path } - } - - async fn serve(&self) { - let file_server = warp::fs::dir(&self.base_path); - warp::serve(file_server).run(([127, 0, 0, 1], 3030)).await; - } - - fn get_url(&self, file_name: &str) -> String { - format!("{}/{}", self.base_url, file_name) - } -} - -pub async fn host_file_and_get_url(file_path: &str) -> Result> { - let file_name = Path::new(file_path).file_name().ok_or("Invalid file path")?.to_str().ok_or("Invalid file name")?; - let file_server = FileServer::new("http://127.0.0.1:3030".to_string(), "path/to/hosted/files".to_string()); - - // Copy the file to the hosting directory - let destination = Path::new(&file_server.base_path).join(file_name); - tokio::fs::copy(file_path, &destination).await?; - - // Start the server if not already running - // This part needs synchronization in a real-world scenario - tokio::spawn(async move { - file_server.serve().await; - }); - - Ok(file_server.get_url(file_name)) -} \ No newline at end of file diff --git a/senders/terminal/src/lib.rs b/senders/terminal/src/lib.rs new file mode 100644 index 0000000..b21a5b8 --- /dev/null +++ b/senders/terminal/src/lib.rs @@ -0,0 +1,3 @@ +pub mod fcastsession; +pub mod models; +pub mod transport; diff --git a/senders/terminal/src/main.rs b/senders/terminal/src/main.rs index 8662edc..db43cf5 100644 --- a/senders/terminal/src/main.rs +++ b/senders/terminal/src/main.rs @@ -1,27 +1,26 @@ -mod models; -mod fcastsession; -mod transport; - use clap::{App, Arg, SubCommand}; -use tiny_http::{Server, Response, ListenAddr, Header}; -use tungstenite::stream::MaybeTlsStream; -use url::Url; use std::collections::HashMap; use std::net::IpAddr; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; use std::thread::JoinHandle; -use std::{thread, fs}; use std::time::Instant; +use std::{fs, thread}; use std::{io::Read, net::TcpStream}; -use std::sync::atomic::{ AtomicBool, Ordering}; use std::{sync::Arc, time::Duration}; +use tiny_http::{Header, ListenAddr, Response, Server}; +use tungstenite::stream::MaybeTlsStream; +use url::Url; -use crate::fcastsession::Opcode; -use crate::models::{SetVolumeMessage, SetSpeedMessage}; -use crate::{models::{PlayMessage, SeekMessage}, fcastsession::FCastSession}; +use fcast::fcastsession::Opcode; +use fcast::{ + fcastsession::FCastSession, + models::v2::PlayMessage, + models::{SeekMessage, SetSpeedMessage, SetVolumeMessage}, +}; -fn main() { +fn main() { if let Err(e) = run() { println!("Failed due to error: {}", e) } @@ -30,127 +29,151 @@ fn main() { fn run() -> Result<(), Box> { let app = App::new("Media Control") .about("Control media playback") - .arg(Arg::with_name("connection_type") - .short('c') - .long("connection_type") - .value_name("CONNECTION_TYPE") - .help("Type of connection: tcp or ws (websocket)") - .required(false) - .default_value("tcp") - .takes_value(true)) - .arg(Arg::with_name("host") - .short('h') - .long("host") - .value_name("Host") - .help("The host address to send the command to") - .required(true) - .takes_value(true)) - .arg(Arg::with_name("port") - .short('p') - .long("port") - .value_name("PORT") - .help("The port to send the command to") - .required(false) - .takes_value(true)) - .subcommand(SubCommand::with_name("play") - .about("Play media") - .arg(Arg::with_name("mime_type") - .short('m') - .long("mime_type") - .value_name("MIME_TYPE") - .help("Mime type (e.g., video/mp4)") - .required_unless_present("file") - .takes_value(true) - ) - .arg(Arg::with_name("file") - .short('f') - .long("file") - .value_name("File") - .help("File content to play") - .required(false) - .takes_value(true)) - .arg(Arg::with_name("url") - .short('u') - .long("url") - .value_name("URL") - .help("URL to the content") - .required(false) - .takes_value(true) - ) - .arg(Arg::with_name("content") + .arg( + Arg::with_name("connection_type") .short('c') - .long("content") - .value_name("CONTENT") - .help("The actual content") + .long("connection_type") + .value_name("CONNECTION_TYPE") + .help("Type of connection: tcp or ws (websocket)") .required(false) - .takes_value(true) - ) - .arg(Arg::with_name("timestamp") - .short('t') - .long("timestamp") - .value_name("TIMESTAMP") - .help("Timestamp to start playing") - .required(false) - .default_value("0") - .takes_value(true) - ) - .arg(Arg::with_name("speed") - .short('s') - .long("speed") - .value_name("SPEED") - .help("Factor to multiply playback speed by") - .required(false) - .default_value("1") - .takes_value(true) - ) - .arg(Arg::with_name("header") - .short('H') - .long("header") - .value_name("HEADER") - .help("Custom request headers in key:value format") - .required(false) - .multiple_occurrences(true) - ) + .default_value("tcp") + .takes_value(true), ) - .subcommand(SubCommand::with_name("seek") - .about("Seek to a timestamp") - .arg(Arg::with_name("timestamp") - .short('t') - .long("timestamp") - .value_name("TIMESTAMP") - .help("Timestamp to start playing") - .required(true) - .takes_value(true) - ), + .arg( + Arg::with_name("host") + .short('h') + .long("host") + .value_name("Host") + .help("The host address to send the command to") + .default_value("127.0.0.1") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("port") + .short('p') + .long("port") + .value_name("PORT") + .help("The port to send the command to") + .required(false) + .takes_value(true), + ) + .subcommand( + SubCommand::with_name("play") + .about("Play media") + .arg( + Arg::with_name("mime_type") + .short('m') + .long("mime_type") + .value_name("MIME_TYPE") + .help("Mime type (e.g., video/mp4)") + .required_unless_present("file") + .takes_value(true), + ) + .arg( + Arg::with_name("file") + .short('f') + .long("file") + .value_name("File") + .help("File content to play") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("url") + .short('u') + .long("url") + .value_name("URL") + .help("URL to the content") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("content") + .short('c') + .long("content") + .value_name("CONTENT") + .help("The actual content") + .required(false) + .takes_value(true), + ) + .arg( + Arg::with_name("timestamp") + .short('t') + .long("timestamp") + .value_name("TIMESTAMP") + .help("Timestamp to start playing") + .required(false) + .default_value("0") + .takes_value(true), + ) + .arg( + Arg::with_name("speed") + .short('s') + .long("speed") + .value_name("SPEED") + .help("Factor to multiply playback speed by") + .required(false) + .default_value("1") + .takes_value(true), + ) + .arg( + Arg::with_name("header") + .short('H') + .long("header") + .value_name("HEADER") + .help("Custom request headers in key:value format") + .required(false) + .multiple_occurrences(true), + ), + ) + .subcommand( + SubCommand::with_name("seek") + .about("Seek to a timestamp") + .arg( + Arg::with_name("timestamp") + .short('t') + .long("timestamp") + .value_name("TIMESTAMP") + .help("Timestamp to start playing") + .required(true) + .takes_value(true), + ), ) .subcommand(SubCommand::with_name("pause").about("Pause media")) .subcommand(SubCommand::with_name("resume").about("Resume media")) .subcommand(SubCommand::with_name("stop").about("Stop media")) .subcommand(SubCommand::with_name("listen").about("Listen to incoming events")) - .subcommand(SubCommand::with_name("setvolume").about("Set the volume") - .arg(Arg::with_name("volume") - .short('v') - .long("volume") - .value_name("VOLUME") - .help("Volume level (0-1)") - .required(true) - .takes_value(true))) - .subcommand(SubCommand::with_name("setspeed").about("Set the playback speed") - .arg(Arg::with_name("speed") - .short('s') - .long("speed") - .value_name("SPEED") - .help("Factor to multiply playback speed by") - .required(true) - .takes_value(true)) + .subcommand( + SubCommand::with_name("setvolume") + .about("Set the volume") + .arg( + Arg::with_name("volume") + .short('v') + .long("volume") + .value_name("VOLUME") + .help("Volume level (0-1)") + .required(true) + .takes_value(true), + ), + ) + .subcommand( + SubCommand::with_name("setspeed") + .about("Set the playback speed") + .arg( + Arg::with_name("speed") + .short('s') + .long("speed") + .value_name("SPEED") + .help("Factor to multiply playback speed by") + .required(true) + .takes_value(true), + ), ); let matches = app.get_matches(); - let host = match matches.value_of("host") { - Some(s) => s, - _ => return Err("Host is required.".into()) - }; + let host = matches.value_of("host").expect("host has default value"); let connection_type = matches.value_of("connection_type").unwrap_or("tcp"); @@ -159,8 +182,10 @@ fn run() -> Result<(), Box> { _ => match connection_type { "tcp" => "46899", "ws" => "46898", - _ => return Err("Unknown connection type, cannot automatically determine port.".into()) - } + _ => { + return Err("Unknown connection type, cannot automatically determine port.".into()) + } + }, }; let local_ip: Option; @@ -169,24 +194,23 @@ fn run() -> Result<(), Box> { println!("Connecting via TCP to host={} port={}...", host, port); let stream = TcpStream::connect(format!("{}:{}", host, port))?; local_ip = Some(stream.local_addr()?.ip()); - FCastSession::new(stream) - }, + FCastSession::connect(stream)? + } "ws" => { println!("Connecting via WebSocket to host={} port={}...", host, port); let url = Url::parse(format!("ws://{}:{}", host, port).as_str())?; let (stream, _) = tungstenite::connect(url)?; local_ip = match stream.get_ref() { MaybeTlsStream::Plain(ref stream) => Some(stream.local_addr()?.ip()), - _ => return Err("Established connection type is not plain.".into()) + _ => return Err("Established connection type is not plain.".into()), }; - FCastSession::new(stream) + FCastSession::connect(stream)? } _ => return Err("Invalid connection type.".into()), }; println!("Connection established."); - let mut join_handle: Option>> = None; if let Some(play_matches) = matches.subcommand_matches("play") { let file_path = play_matches.value_of("file"); @@ -197,7 +221,7 @@ fn run() -> Result<(), Box> { if file_path.is_none() { return Err("MIME type is required.".into()); } - match file_path.unwrap().split('.').last() { + match file_path.unwrap().split('.').next_back() { Some("mkv") => "video/x-matroska".to_string(), Some("mov") => "video/quicktime".to_string(), Some("mp4") | Some("m4v") => "video/mp4".to_string(), @@ -210,68 +234,59 @@ fn run() -> Result<(), Box> { let time = match play_matches.value_of("timestamp") { Some(s) => s.parse::().ok(), - _ => None + _ => None, }; let speed = match play_matches.value_of("speed") { Some(s) => s.parse::().ok(), - _ => None + _ => None, }; - let headers = play_matches.values_of("header") - .map(|values| values - .filter_map(|s| { - let mut parts = s.splitn(2, ':'); - if let (Some(key), Some(value)) = (parts.next(), parts.next()) { - Some((key.trim().to_string(), value.trim().to_string())) - } else { - None - } - } - ).collect::>()); + let headers = play_matches.values_of("header").map(|values| { + values + .filter_map(|s| { + let mut parts = s.splitn(2, ':'); + if let (Some(key), Some(value)) = (parts.next(), parts.next()) { + Some((key.trim().to_string(), value.trim().to_string())) + } else { + None + } + }) + .collect::>() + }); let mut play_message = if let Some(file_path) = file_path { match local_ip { Some(lip) => { let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); + let r = running.clone(); ctrlc::set_handler(move || { - println!("Ctrl+C triggered, server will stop when onging request finishes..."); + println!( + "Ctrl+C triggered, server will stop when onging request finishes..." + ); r.store(false, Ordering::SeqCst); - }).expect("Error setting Ctrl-C handler"); - + }) + .expect("Error setting Ctrl-C handler"); + println!("Waiting for Ctrl+C..."); let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?; let url = result.0; join_handle = Some(result.1); - PlayMessage::new( - mime_type, - Some(url), - None, - time, - speed, - headers - ) - }, - _ => return Err("Local IP was not able to be resolved.".into()) + PlayMessage::new(mime_type, Some(url), None, time, speed, headers) + } + _ => return Err("Local IP was not able to be resolved.".into()), } } else { PlayMessage::new( mime_type, - match play_matches.value_of("url") { - Some(s) => Some(s.to_string()), - _ => None - }, - match play_matches.value_of("content") { - Some(s) => Some(s.to_string()), - _ => None - }, + play_matches.value_of("url").map(|s| s.to_owned()), + play_matches.value_of("content").map(|s| s.to_owned()), time, speed, - headers + headers, ) }; @@ -290,20 +305,20 @@ fn run() -> Result<(), Box> { } else if let Some(seek_matches) = matches.subcommand_matches("seek") { let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") { Some(s) => s.parse::()?, - _ => return Err("Timestamp is required.".into()) + _ => return Err("Timestamp is required.".into()), }); println!("Sent seek {:?}", seek_message); session.send_message(Opcode::Seek, &seek_message)?; - } else if let Some(_) = matches.subcommand_matches("pause") { + } else if matches.subcommand_matches("pause").is_some() { println!("Sent pause"); session.send_empty(Opcode::Pause)?; - } else if let Some(_) = matches.subcommand_matches("resume") { + } else if matches.subcommand_matches("resume").is_some() { println!("Sent resume"); session.send_empty(Opcode::Resume)?; - } else if let Some(_) = matches.subcommand_matches("stop") { + } else if matches.subcommand_matches("stop").is_some() { println!("Sent stop"); session.send_empty(Opcode::Stop)?; - } else if let Some(_) = matches.subcommand_matches("listen") { + } else if matches.subcommand_matches("listen").is_some() { println!("Starter listening to events..."); let running = Arc::new(AtomicBool::new(true)); @@ -312,7 +327,8 @@ fn run() -> Result<(), Box> { ctrlc::set_handler(move || { println!("Ctrl+C triggered..."); r.store(false, Ordering::SeqCst); - }).expect("Error setting Ctrl-C handler"); + }) + .expect("Error setting Ctrl-C handler"); println!("Waiting for Ctrl+C..."); @@ -322,14 +338,14 @@ fn run() -> Result<(), Box> { } else if let Some(setvolume_matches) = matches.subcommand_matches("setvolume") { let setvolume_message = SetVolumeMessage::new(match setvolume_matches.value_of("volume") { Some(s) => s.parse::()?, - _ => return Err("Timestamp is required.".into()) + _ => return Err("Timestamp is required.".into()), }); println!("Sent setvolume {:?}", setvolume_message); session.send_message(Opcode::SetVolume, &setvolume_message)?; } else if let Some(setspeed_matches) = matches.subcommand_matches("setspeed") { let setspeed_message = SetSpeedMessage::new(match setspeed_matches.value_of("speed") { Some(s) => s.parse::()?, - _ => return Err("Speed is required.".into()) + _ => return Err("Speed is required.".into()), }); println!("Sent setspeed {:?}", setspeed_message); session.send_message(Opcode::SetSpeed, &setspeed_message)?; @@ -340,7 +356,7 @@ fn run() -> Result<(), Box> { println!("Waiting on other threads..."); if let Some(v) = join_handle { - if let Err(_) = v.join() { + if v.join().is_err() { return Err("Failed to join thread.".into()); } } @@ -364,19 +380,19 @@ impl ServerState { } } -fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, running: &Arc) -> Result<(String, thread::JoinHandle>), String> { +fn host_file_and_get_url( + local_ip: &IpAddr, + file_path: &str, + mime_type: &String, + running: &Arc, +) -> Result<(String, thread::JoinHandle>), String> { let local_ip_str = if local_ip.is_ipv6() { format!("[{}]", local_ip) } else { format!("{}", local_ip) }; - let server = { - let this = Server::http(format!("{local_ip_str}:0")); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|e| format!("Failed to create server: {}", e))(e)), - } - }?; + let server = Server::http(format!("{local_ip_str}:0")) + .map_err(|err| format!("Failed to create server: {err}"))?; let url = match server.server_addr() { ListenAddr::IP(addr) => format!("http://{local_ip_str}:{}/", addr.port()), @@ -399,14 +415,9 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, } let should_break = { - let state = { - let this = state.lock(); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|e| format!("Mutex error: {}", e))(e)), - } - }?; - state.active_connections == 0 && state.last_request_time.elapsed() > Duration::from_secs(300) + let state = state.lock().unwrap(); + state.active_connections == 0 + && state.last_request_time.elapsed() > Duration::from_secs(300) }; if should_break { @@ -418,34 +429,18 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, Ok(Some(request)) => { println!("Request received."); - let mut state = { - let this = state.lock(); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|e| format!("Mutex error: {}", e))(e)), - } - }?; + let mut state = state.lock().unwrap(); state.active_connections += 1; state.last_request_time = Instant::now(); - let file = { - let this = fs::File::open(&file_path_clone); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|_| "Failed to open file.".to_string())(e)), - } - }?; + let file = fs::File::open(&file_path_clone) + .map_err(|_| "Failed to open file.".to_owned())?; - let content_type_header = { - let this = Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str()); - match this { - Ok(t) => Ok(t), - Err(e) => Err((|_| "Failed to open file.".to_string())(e)), - } - }?; + let content_type_header = + Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str()) + .map_err(|_| "Failed to open file.".to_owned())?; - let response = Response::from_file(file) - .with_header(content_type_header); + let response = Response::from_file(file).with_header(content_type_header); if let Err(e) = request.respond(response) { println!("Failed to respond to request: {}", e); diff --git a/senders/terminal/src/models.rs b/senders/terminal/src/models.rs deleted file mode 100644 index f3a5d38..0000000 --- a/senders/terminal/src/models.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::collections::HashMap; - -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Debug)] -pub struct PlayMessage { - pub container: String, - pub url: Option, - pub content: Option, - pub time: Option, - pub speed: Option, - pub headers: Option> -} - -impl PlayMessage { - pub fn new(container: String, url: Option, content: Option, time: Option, speed: Option, headers: Option>) -> Self { - Self { container, url, content, time, speed, headers } - } -} - -#[derive(Serialize, Debug)] -pub struct SeekMessage { - pub time: f64, -} - -impl SeekMessage { - pub fn new(time: f64) -> Self { - Self { time } - } -} - -#[derive(Deserialize, Debug)] -pub struct PlaybackUpdateMessage { - #[serde(rename = "generationTime")] - pub generation_time: u64, - pub time: f64, - pub duration: f64, - pub speed: f64, - pub state: u8 //0 = None, 1 = Playing, 2 = Paused -} - -#[derive(Deserialize, Debug)] -pub struct VolumeUpdateMessage { - #[serde(rename = "generationTime")] - pub generation_time: u64, - pub volume: f64 //(0-1) -} - -#[derive(Serialize, Debug)] -pub struct SetVolumeMessage { - pub volume: f64, -} - -impl SetVolumeMessage { - pub fn new(volume: f64) -> Self { - Self { volume } - } -} - -#[derive(Serialize, Debug)] -pub struct SetSpeedMessage { - pub speed: f64, -} - -impl SetSpeedMessage { - pub fn new(speed: f64) -> Self { - Self { speed } - } -} - -#[derive(Deserialize, Debug)] -pub struct PlaybackErrorMessage { - pub message: String, -} - -#[derive(Deserialize, Debug)] -pub struct VersionMessage { - pub version: u64, -} \ No newline at end of file diff --git a/senders/terminal/src/models/mod.rs b/senders/terminal/src/models/mod.rs new file mode 100644 index 0000000..c563b7c --- /dev/null +++ b/senders/terminal/src/models/mod.rs @@ -0,0 +1,54 @@ +use serde::{Deserialize, Serialize}; + +pub mod v2; +pub mod v3; + +#[derive(Deserialize, Debug)] +pub struct PlaybackErrorMessage { + pub message: String, +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct VersionMessage { + pub version: u64, +} + +#[derive(Serialize, Debug)] +pub struct SetSpeedMessage { + pub speed: f64, +} + +impl SetSpeedMessage { + pub fn new(speed: f64) -> Self { + Self { speed } + } +} + +#[derive(Deserialize, Debug)] +pub struct VolumeUpdateMessage { + #[serde(rename = "generationTime")] + pub generation_time: u64, + pub volume: f64, //(0-1) +} + +#[derive(Serialize, Debug)] +pub struct SetVolumeMessage { + pub volume: f64, +} + +impl SetVolumeMessage { + pub fn new(volume: f64) -> Self { + Self { volume } + } +} + +#[derive(Serialize, Debug)] +pub struct SeekMessage { + pub time: f64, +} + +impl SeekMessage { + pub fn new(time: f64) -> Self { + Self { time } + } +} diff --git a/senders/terminal/src/models/v2.rs b/senders/terminal/src/models/v2.rs new file mode 100644 index 0000000..73843e9 --- /dev/null +++ b/senders/terminal/src/models/v2.rs @@ -0,0 +1,43 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Debug)] +pub struct PlayMessage { + pub container: String, + pub url: Option, + pub content: Option, + pub time: Option, + pub speed: Option, + pub headers: Option>, +} + +impl PlayMessage { + pub fn new( + container: String, + url: Option, + content: Option, + time: Option, + speed: Option, + headers: Option>, + ) -> Self { + Self { + container, + url, + content, + time, + speed, + headers, + } + } +} + +#[derive(Deserialize, Debug)] +pub struct PlaybackUpdateMessage { + #[serde(rename = "generationTime")] + pub generation_time: u64, + pub time: f64, + pub duration: f64, + pub speed: f64, + pub state: u8, //0 = None, 1 = Playing, 2 = Paused +} diff --git a/senders/terminal/src/models/v3.rs b/senders/terminal/src/models/v3.rs new file mode 100644 index 0000000..1d24374 --- /dev/null +++ b/senders/terminal/src/models/v3.rs @@ -0,0 +1,671 @@ +use std::collections::HashMap; + +use serde::{de, Deserialize, Serialize}; +use serde_json::{json, Value}; +use serde_repr::{Deserialize_repr, Serialize_repr}; + +macro_rules! get_from_map { + ($map:expr, $key:expr) => { + $map.get($key).ok_or(de::Error::missing_field($key)) + }; +} + +#[derive(Debug, PartialEq, Clone)] +pub enum MetadataObject { + Generic { + title: Option, + thumbnail_url: Option, + custom: Value, + }, +} + +impl Serialize for MetadataObject { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + MetadataObject::Generic { + title, + thumbnail_url, + custom, + } => { + let mut map = serde_json::Map::new(); + map.insert("type".to_owned(), json!(0u64)); + map.insert( + "title".to_owned(), + match title { + Some(t) => Value::String(t.to_owned()), + None => Value::Null, + }, + ); + map.insert( + "thumbnailUrl".to_owned(), + match thumbnail_url { + Some(t) => Value::String(t.to_owned()), + None => Value::Null, + }, + ); + map.insert("custom".to_owned(), custom.clone()); + map.serialize(serializer) + } + } + } +} + +// TODO: handle errors +impl<'de> Deserialize<'de> for MetadataObject { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let mut map = serde_json::Map::deserialize(deserializer)?; + + let type_ = map + .remove("type") + .ok_or(de::Error::missing_field("type"))? + .as_u64() + .ok_or(de::Error::custom("`type` is not an integer"))?; + let rest = Value::Object(map); + + match type_ { + 0 => Ok(Self::Generic { + title: rest.get("title").map(|v| v.as_str().unwrap().to_owned()), + thumbnail_url: rest + .get("thumbnailUrl") + .map(|v| v.as_str().unwrap().to_owned()), + custom: rest.get("custom").unwrap().clone(), + }), + _ => Err(de::Error::custom(format!("Unknown metadata type {type_}"))), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct PlayMessage { + /// The MIME type (video/mp4) + pub container: String, + // The URL to load (optional) + pub url: Option, + // The content to load (i.e. a DASH manifest, json content, optional) + pub content: Option, + // The time to start playing in seconds + pub time: Option, + // The desired volume (0-1) + pub volume: Option, + // The factor to multiply playback speed by (defaults to 1.0) + pub speed: Option, + // HTTP request headers to add to the play request Map + pub headers: Option>, + pub metadata: Option, +} + +#[derive(Deserialize_repr, Serialize_repr, Debug)] +#[repr(u8)] +pub enum ContentType { + Playlist = 0, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct MediaItem { + /// The MIME type (video/mp4) + pub container: String, + /// The URL to load (optional) + pub url: Option, + /// The content to load (i.e. a DASH manifest, json content, optional) + pub content: Option, + /// The time to start playing in seconds + pub time: Option, + /// The desired volume (0-1) + pub volume: Option, + /// The factor to multiply playback speed by (defaults to 1.0) + pub speed: Option, + /// Indicates if the receiver should preload the media item + pub cache: Option, + /// Indicates how long the item content is presented on screen in seconds + pub showDuration: Option, + /// HTTP request headers to add to the play request Map + pub headers: Option>, + pub metadata: Option, +} + +#[derive(Serialize, Debug)] +pub struct PlaylistContent { + #[serde(rename = "type")] + variant: ContentType, + pub items: Vec, + /// Start position of the first item to play from the playlist + pub offset: Option, // int or float? + /// The desired volume (0-1) + pub volume: Option, + /// The factor to multiply playback speed by (defaults to 1.0) + pub speed: Option, + /// Count of media items should be pre-loaded forward from the current view index + #[serde(rename = "forwardCache")] + pub forward_cache: Option, + /// Count of media items should be pre-loaded backward from the current view index + #[serde(rename = "backwardCache")] + pub backward_cache: Option, + pub metadata: Option, +} + +#[derive(Serialize_repr, Debug)] +#[repr(u8)] +pub enum PlaybackState { + Idle = 0, + Playing = 1, + Paused = 2, +} + +#[derive(Serialize, Debug)] +pub struct PlaybackUpdateMessage { + // The time the packet was generated (unix time milliseconds) + pub generationTime: u64, + // The playback state + pub state: PlaybackState, + // The current time playing in seconds + pub time: Option, + // The duration in seconds + pub duration: Option, + // The playback speed factor + pub speed: Option, + // The playlist item index currently being played on receiver + pub itemIndex: Option, +} + +#[derive(Serialize, Debug)] +pub struct InitialSenderMessage { + #[serde(rename = "displayName")] + pub display_name: Option, + #[serde(rename = "appName")] + pub app_name: Option, + #[serde(rename = "appVersion")] + pub app_version: Option, +} + +#[derive(Deserialize, Debug)] +pub struct InitialReceiverMessage { + #[serde(rename = "displayName")] + pub display_name: Option, + #[serde(rename = "appName")] + pub app_name: Option, + #[serde(rename = "appVersion")] + pub app_version: Option, + #[serde(rename = "playData")] + pub play_data: Option, +} + +#[derive(Deserialize, Debug)] +pub struct PlayUpdateMessage { + #[serde(rename = "generationTime")] + pub generation_time: Option, + #[serde(rename = "playData")] + pub play_data: Option, +} + +#[derive(Serialize, Debug)] +pub struct SetPlaylistItemMessage { + #[serde(rename = "itemIndex")] + pub item_index: Option, +} + +#[derive(Deserialize, Debug)] +pub enum KeyNames { + ArrowLeft, + ArrowRight, + ArrowUp, + ArrowDown, + Enter, +} + +#[derive(Debug, PartialEq)] +pub enum EventSubscribeObject { + MediaItemStart, + MediaItemEnd, + MediaItemChanged, + KeyDown { keys: Vec }, + KeyUp { keys: Vec }, +} + +impl Serialize for EventSubscribeObject { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serde_json::Map::new(); + let type_val: u64 = match self { + EventSubscribeObject::MediaItemStart => 0, + EventSubscribeObject::MediaItemEnd => 1, + EventSubscribeObject::MediaItemChanged => 2, + EventSubscribeObject::KeyDown { .. } => 3, + EventSubscribeObject::KeyUp { .. } => 4, + }; + + map.insert("type".to_owned(), json!(type_val)); + + let keys = match self { + EventSubscribeObject::KeyDown { keys } => Some(keys), + EventSubscribeObject::KeyUp { keys } => Some(keys), + _ => None, + }; + if let Some(keys) = keys { + map.insert("keys".to_owned(), json!(keys)); + } + + map.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for EventSubscribeObject { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let mut map = serde_json::Map::deserialize(deserializer)?; + let type_ = map + .remove("type") + .ok_or(de::Error::missing_field("type"))? + .as_u64() + .ok_or(de::Error::custom("`type` is not an integer"))?; + let rest = Value::Object(map); + + match type_ { + 0 => Ok(Self::MediaItemStart), + 1 => Ok(Self::MediaItemEnd), + 2 => Ok(Self::MediaItemChanged), + 3 | 4 => { + let keys = get_from_map!(rest, "keys")? + .as_array() + .ok_or(de::Error::custom("`type` is not an array"))? + .iter() + .map(|v| v.as_str().map(|s| s.to_owned())) + .collect::>>() + .ok_or(de::Error::custom("`type` is not an array of strings"))?; + if type_ == 3 { + Ok(Self::KeyDown { keys }) + } else { + Ok(Self::KeyUp { keys }) + } + } + _ => Err(de::Error::custom(format!("Unknown event type {type_}"))), + } + } +} + +#[derive(Deserialize, Debug)] +pub struct SubscribeEventMessage { + event: EventSubscribeObject, +} + +#[derive(Deserialize, Debug)] +pub struct UnsubscribeEventMessage { + event: EventSubscribeObject, +} + +#[derive(Debug, PartialEq, Copy, Clone)] +#[repr(u8)] +pub enum EventType { + MediaItemStart = 0, + MediaItemEnd = 1, + MediaItemChange = 2, + KeyDown = 3, + KeyUp = 4, +} + +#[derive(Debug, PartialEq)] +#[allow(clippy::large_enum_variant)] +pub enum EventObject { + MediaItem { + variant: EventType, + item: MediaItem, + }, + Key { + variant: EventType, + key: String, + repeat: bool, + handled: bool, + }, +} + +impl Serialize for EventObject { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serde_json::Map::new(); + + match self { + EventObject::MediaItem { variant, item } => { + map.insert("type".to_owned(), json!(*variant as u8)); + map.insert("item".to_owned(), serde_json::to_value(item).unwrap()); + } + EventObject::Key { + variant, + key, + repeat, + handled, + } => { + map.insert("type".to_owned(), json!(*variant as u8)); + map.insert("key".to_owned(), serde_json::to_value(key).unwrap()); + map.insert("repeat".to_owned(), serde_json::to_value(repeat).unwrap()); + map.insert("handled".to_owned(), serde_json::to_value(handled).unwrap()); + } + } + + map.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for EventObject { + fn deserialize(deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + let mut map = serde_json::Map::deserialize(deserializer)?; + let type_ = map + .remove("type") + .ok_or(de::Error::missing_field("type"))? + .as_u64() + .ok_or(de::Error::custom("`type` is not an integer"))?; + let rest = Value::Object(map); + + match type_ { + #[allow(clippy::manual_range_patterns)] + 0 | 1 | 2 => { + let variant = match type_ { + 0 => EventType::MediaItemStart, + 1 => EventType::MediaItemEnd, + _ => EventType::MediaItemChange, + }; + let item = get_from_map!(rest, "item")?; + Ok(Self::MediaItem { + variant, + item: MediaItem::deserialize(item).unwrap(), + }) + } + 3 | 4 => { + let variant = if type_ == 3 { + EventType::KeyDown + } else { + EventType::KeyUp + }; + Ok(Self::Key { + variant, + key: get_from_map!(rest, "key")?.as_str().unwrap().to_owned(), + repeat: get_from_map!(rest, "repeat")?.as_bool().unwrap(), + handled: get_from_map!(rest, "handled")?.as_bool().unwrap(), + }) + } + _ => Err(de::Error::custom(format!("Unknown event type {type_}"))), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct EventMessage { + #[serde(rename = "generationTime")] + pub generation_time: u64, + event: EventObject, +} + +#[cfg(test)] +mod tests { + use super::*; + + macro_rules! s { + ($s:expr) => { + ($s).to_string() + }; + } + + #[test] + fn serialize_metadata_object() { + assert_eq!( + &serde_json::to_string(&MetadataObject::Generic { + title: Some(s!("abc")), + thumbnail_url: Some(s!("def")), + custom: serde_json::Value::Null, + }) + .unwrap(), + r#"{"custom":null,"thumbnailUrl":"def","title":"abc","type":0}"# + ); + assert_eq!( + &serde_json::to_string(&MetadataObject::Generic { + title: None, + thumbnail_url: None, + custom: serde_json::Value::Null, + }) + .unwrap(), + r#"{"custom":null,"thumbnailUrl":null,"title":null,"type":0}"# + ); + } + + #[test] + fn deserialize_metadata_object() { + assert_eq!( + serde_json::from_str::( + r#"{"type":0,"title":"abc","thumbnailUrl":"def","custom":null}"# + ) + .unwrap(), + MetadataObject::Generic { + title: Some(s!("abc")), + thumbnail_url: Some(s!("def")), + custom: serde_json::Value::Null, + } + ); + assert_eq!( + serde_json::from_str::(r#"{"type":0,"custom":null}"#).unwrap(), + MetadataObject::Generic { + title: None, + thumbnail_url: None, + custom: serde_json::Value::Null, + } + ); + assert!(serde_json::from_str::(r#"{"type":1"#).is_err()); + } + + #[test] + fn serialize_event_sub_obj() { + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::MediaItemStart).unwrap(), + r#"{"type":0}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::MediaItemEnd).unwrap(), + r#"{"type":1}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::MediaItemChanged).unwrap(), + r#"{"type":2}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(), + r#"{"keys":[],"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(), + r#"{"keys":[],"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyUp { + keys: vec![s!("abc"), s!("def")] + }) + .unwrap(), + r#"{"keys":["abc","def"],"type":4}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { + keys: vec![s!("abc"), s!("def")] + }) + .unwrap(), + r#"{"keys":["abc","def"],"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventSubscribeObject::KeyDown { + keys: vec![s!("\"\"")] + }) + .unwrap(), + r#"{"keys":["\"\""],"type":3}"# + ); + } + + #[test] + fn deserialize_event_sub_obj() { + assert_eq!( + serde_json::from_str::(r#"{"type":0}"#).unwrap(), + EventSubscribeObject::MediaItemStart + ); + assert_eq!( + serde_json::from_str::(r#"{"type":1}"#).unwrap(), + EventSubscribeObject::MediaItemEnd + ); + assert_eq!( + serde_json::from_str::(r#"{"type":2}"#).unwrap(), + EventSubscribeObject::MediaItemChanged + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":[],"type":3}"#).unwrap(), + EventSubscribeObject::KeyDown { keys: vec![] } + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":[],"type":4}"#).unwrap(), + EventSubscribeObject::KeyUp { keys: vec![] } + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":["abc","def"],"type":3}"#) + .unwrap(), + EventSubscribeObject::KeyDown { + keys: vec![s!("abc"), s!("def")] + } + ); + assert_eq!( + serde_json::from_str::(r#"{"keys":["abc","def"],"type":4}"#) + .unwrap(), + EventSubscribeObject::KeyUp { + keys: vec![s!("abc"), s!("def")] + } + ); + assert!(serde_json::from_str::(r#""type":5}"#).is_err()); + } + + const EMPTY_TEST_MEDIA_ITEM: MediaItem = MediaItem { + container: String::new(), + url: None, + content: None, + time: None, + volume: None, + speed: None, + cache: None, + showDuration: None, + headers: None, + metadata: None, + }; + const TEST_MEDIA_ITEM_JSON: &str = r#"{"cache":null,"container":"","content":null,"headers":null,"metadata":null,"showDuration":null,"speed":null,"time":null,"url":null,"volume":null}"#; + + #[test] + fn serialize_event_obj() { + assert_eq!( + serde_json::to_string(&EventObject::MediaItem { + variant: EventType::MediaItemStart, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + }) + .unwrap(), + format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"#), + ); + assert_eq!( + serde_json::to_string(&EventObject::MediaItem { + variant: EventType::MediaItemEnd, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + }) + .unwrap(), + format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"#), + ); + assert_eq!( + serde_json::to_string(&EventObject::MediaItem { + variant: EventType::MediaItemChange, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + }) + .unwrap(), + format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"#), + ); + assert_eq!( + &serde_json::to_string(&EventObject::Key { + variant: EventType::KeyDown, + key: s!(""), + repeat: false, + handled: false, + }) + .unwrap(), + r#"{"handled":false,"key":"","repeat":false,"type":3}"# + ); + assert_eq!( + &serde_json::to_string(&EventObject::Key { + variant: EventType::KeyUp, + key: s!(""), + repeat: false, + handled: false, + }) + .unwrap(), + r#"{"handled":false,"key":"","repeat":false,"type":4}"# + ); + } + + #[test] + fn deserialize_event_obj() { + assert_eq!( + serde_json::from_str::(&format!( + r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"# + )) + .unwrap(), + EventObject::MediaItem { + variant: EventType::MediaItemStart, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + } + ); + assert_eq!( + serde_json::from_str::(&format!( + r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"# + )) + .unwrap(), + EventObject::MediaItem { + variant: EventType::MediaItemEnd, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + } + ); + assert_eq!( + serde_json::from_str::(&format!( + r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"# + )) + .unwrap(), + EventObject::MediaItem { + variant: EventType::MediaItemChange, + item: EMPTY_TEST_MEDIA_ITEM.clone(), + } + ); + assert_eq!( + serde_json::from_str::( + r#"{"handled":false,"key":"","repeat":false,"type":3}"# + ) + .unwrap(), + EventObject::Key { + variant: EventType::KeyDown, + key: s!(""), + repeat: false, + handled: false, + } + ); + assert_eq!( + serde_json::from_str::( + r#"{"handled":false,"key":"","repeat":false,"type":4}"# + ) + .unwrap(), + EventObject::Key { + variant: EventType::KeyUp, + key: s!(""), + repeat: false, + handled: false, + } + ); + assert!(serde_json::from_str::(r#"{"type":5}"#).is_err()); + } +} diff --git a/senders/terminal/src/transport.rs b/senders/terminal/src/transport.rs index 655e513..8a79c8c 100644 --- a/senders/terminal/src/transport.rs +++ b/senders/terminal/src/transport.rs @@ -1,12 +1,13 @@ use std::io::{Read, Write}; use std::net::TcpStream; -use tungstenite::Message; use tungstenite::protocol::WebSocket; +use tungstenite::Message; pub trait Transport { fn transport_read(&mut self, buf: &mut [u8]) -> Result; fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error>; fn transport_shutdown(&mut self) -> Result<(), std::io::Error>; + fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error>; } impl Transport for TcpStream { @@ -21,6 +22,10 @@ impl Transport for TcpStream { fn transport_shutdown(&mut self) -> Result<(), std::io::Error> { self.shutdown(std::net::Shutdown::Both) } + + fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> { + self.read_exact(buf) + } } impl Transport for WebSocket { @@ -30,27 +35,69 @@ impl Transport for WebSocket { let len = std::cmp::min(buf.len(), data.len()); buf[..len].copy_from_slice(&data[..len]); Ok(len) - }, - _ => Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid message type")) + } + _ => Err(std::io::Error::other("Invalid message type")), } } fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> { self.write(Message::Binary(buf.to_vec())) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - self.flush().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .map_err(std::io::Error::other)?; + self.flush().map_err(std::io::Error::other) } fn transport_shutdown(&mut self) -> Result<(), std::io::Error> { - self.close(None).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + self.close(None).map_err(std::io::Error::other)?; loop { match self.read() { Ok(_) => continue, Err(tungstenite::Error::ConnectionClosed) => break, - Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + Err(e) => return Err(std::io::Error::other(e)), } } Ok(()) } -} \ No newline at end of file + + fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> { + let mut total_read = 0; + while total_read < buf.len() { + total_read += self.transport_read(&mut buf[total_read..])?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::net::TcpListener; + + use super::*; + + #[test] + fn websocket_read_exact() { + let jh = std::thread::spawn(|| { + let server = TcpListener::bind("127.0.0.1:51234").unwrap(); + let stream = server.incoming().next().unwrap().unwrap(); + let mut websocket = tungstenite::accept(stream).unwrap(); + websocket + .send(tungstenite::Message::binary([1, 2, 3])) + .unwrap(); + }); + + let (mut websocket, _) = tungstenite::connect("ws://127.0.0.1:51234").unwrap(); + + fn read_exact(stream: &mut T) { + let mut buf = [0u8; 3]; + stream.transport_read_exact(&mut buf).unwrap(); + assert_eq!(buf, [1, 2, 3]); + } + + read_exact(&mut websocket); + + websocket.close(None).unwrap(); + + jh.join().unwrap(); + } +}