From 91a2bbee13c3945c163b1edec5ef8ad7ca6ea729 Mon Sep 17 00:00:00 2001 From: Marcus Hanestad Date: Wed, 4 Jun 2025 12:20:25 +0200 Subject: [PATCH] rs-terminal: support v3 PlayMessage and event subscriptions --- senders/terminal/src/fcastsession.rs | 137 ++++++++++++++++++++++++--- senders/terminal/src/main.rs | 62 ++++++++---- senders/terminal/src/models/v3.rs | 20 +++- 3 files changed, 184 insertions(+), 35 deletions(-) diff --git a/senders/terminal/src/fcastsession.rs b/senders/terminal/src/fcastsession.rs index 40a9c82..b83462d 100644 --- a/senders/terminal/src/fcastsession.rs +++ b/senders/terminal/src/fcastsession.rs @@ -1,18 +1,30 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use crate::{ - models::v2::PlaybackUpdateMessage, - models::{PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage}, + models::{ + v2::{self, PlaybackUpdateMessage}, + v3, PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage, + }, transport::Transport, }; use serde::Serialize; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] +enum ProtoVersion { + V2, + V3, +} + +#[derive(Debug, PartialEq, Eq)] enum SessionState { - Idle = 0, + Idle, + Connected(ProtoVersion), Disconnected, } @@ -32,7 +44,6 @@ pub enum Opcode { Version = 11, Ping = 12, Pong = 13, - Initial = 14, PlayUpdate = 15, SetPlaylistItem = 16, @@ -58,7 +69,6 @@ impl Opcode { 11 => Opcode::Version, 12 => Opcode::Ping, 13 => Opcode::Pong, - 14 => Opcode::Initial, 15 => Opcode::PlayUpdate, 16 => Opcode::SetPlaylistItem, @@ -103,10 +113,25 @@ impl<'a> FCastSession<'a> { } let msg: VersionMessage = - serde_json::from_str(&body.ok_or("Version messages required body".to_owned())?)?; + serde_json::from_str(&body.ok_or("Version requires body".to_owned())?)?; if msg.version == 3 { - todo!("Send/recv initial message"); + let initial = v3::InitialSenderMessage { + display_name: None, + app_name: Some(env!("CARGO_PKG_NAME").to_owned()), + app_version: Some(env!("CARGO_PKG_VERSION").to_owned()), + }; + session.send_message(Opcode::Initial, initial)?; + let (opcode, body) = session.read_packet()?; + if opcode != Opcode::Initial { + return Err(format!("Expected Opcode::Initial, got {opcode:?}").into()); + } + let inital_receiver: v3::InitialReceiverMessage = + serde_json::from_str(&body.ok_or("InitialReceiverMessage requires body")?)?; + dbg!(inital_receiver); + session.state = SessionState::Connected(ProtoVersion::V3); + } else { + session.state = SessionState::Connected(ProtoVersion::V2); } Ok(session) @@ -175,6 +200,32 @@ impl<'a> FCastSession<'a> { Ok(()) } + pub fn subscribe(&mut self, event: v3::EventType) -> Result<(), Box> { + if self.state != SessionState::Connected(ProtoVersion::V3) { + return Err(format!( + "Cannot subscribe to events in the current state ({:?})", + self.state + ).into()); + } + + let obj = match event { + v3::EventType::MediaItemStart => v3::EventSubscribeObject::MediaItemStart, + v3::EventType::MediaItemEnd => v3::EventSubscribeObject::MediaItemEnd, + v3::EventType::MediaItemChange => v3::EventSubscribeObject::MediaItemChanged, + v3::EventType::KeyDown => v3::EventSubscribeObject::KeyDown { + keys: v3::KeyNames::all(), + }, + v3::EventType::KeyUp => v3::EventSubscribeObject::KeyUp { + keys: v3::KeyNames::all(), + }, + }; + + self.send_message( + Opcode::SubscribeEvent, + v3::SubscribeEventMessage { event: obj }, + ) + } + pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), Box> { let json = String::new(); let data = json.as_bytes(); @@ -202,6 +253,46 @@ impl<'a> FCastSession<'a> { Ok(()) } + pub fn send_play_message( + &mut self, + mime_type: String, + url: Option, + content: Option, + time: Option, + speed: Option, + headers: Option>, + ) -> Result<(), Box> { + match self.state { + SessionState::Connected(ProtoVersion::V2) => { + let msg = v2::PlayMessage { + container: mime_type, + url, + content, + time, + speed, + headers, + }; + self.send_message(Opcode::Play, msg)?; + } + SessionState::Connected(ProtoVersion::V3) => { + let msg = v3::PlayMessage { + container: mime_type, + url, + content, + time, + volume: Some(1.0), + speed, + headers, + metadata: None, + }; + self.send_message(Opcode::Play, msg)?; + } + _ => return Err("invalid state for sending play message".into()), + } + + Ok(()) + } + fn handle_packet( &mut self, opcode: Opcode, @@ -267,6 +358,30 @@ impl<'a> FCastSession<'a> { self.send_empty(Opcode::Pong)?; println!("Sent pong"); } + Opcode::PlayUpdate => { + if let Some(body_str) = body { + if let Ok(play_update_msg) = + serde_json::from_str::(&body_str) + { + println!("Received play update {play_update_msg:?}"); + } else { + println!("Received play update with malformed body."); + } + } else { + println!("Received play update with no body."); + } + } + Opcode::Event => { + if let Some(body_str) = body { + if let Ok(event_msg) = serde_json::from_str::(&body_str) { + println!("Received event {event_msg:?}"); + } else { + println!("Received event with malformed body."); + } + } else { + println!("Received event with no body."); + } + } _ => { println!("Error handling packet"); } diff --git a/senders/terminal/src/main.rs b/senders/terminal/src/main.rs index 59fdbca..3f9042d 100644 --- a/senders/terminal/src/main.rs +++ b/senders/terminal/src/main.rs @@ -1,4 +1,5 @@ use clap::{App, Arg, SubCommand}; +use fcast::models::v3; use fcast::transport::WebSocket; use std::collections::HashMap; use std::net::IpAddr; @@ -17,7 +18,6 @@ use url::Url; use fcast::fcastsession::Opcode; use fcast::{ fcastsession::FCastSession, - models::v2::PlayMessage, models::{SeekMessage, SetSpeedMessage, SetVolumeMessage}, }; @@ -59,6 +59,16 @@ fn run() -> Result<(), Box> { .required(false) .takes_value(true), ) + .arg( + Arg::with_name("subscribe") + .short('s') + .long("subscribe") + .value_name("SUBSCRIPTIONS") + .help("A comma separated list of events to subscribe to (e.g. MediaItemStart,KeyDown). \ + Available events: [MediaItemStart, MediaItemEnd, MediaItemChange, KeyDown, KeyUp]") + .required(false) + .takes_value(true), + ) .subcommand( SubCommand::with_name("play") .about("Play media") @@ -213,6 +223,25 @@ fn run() -> Result<(), Box> { println!("Connection established."); + if let Some(subscriptions) = matches.value_of("subscribe") { + let subs = subscriptions.split(','); + for sub in subs { + let event = match sub.to_lowercase().as_str() { + "mediaitemstart" => v3::EventType::MediaItemStart, + "mediaitemend" => v3::EventType::MediaItemEnd, + "mediaitemchange" => v3::EventType::MediaItemChange, + "keydown" => v3::EventType::KeyDown, + "keyup" => v3::EventType::KeyUp, + _ => { + println!("Invalid event in subscriptions list: {sub}"); + continue; + } + }; + session.subscribe(event)?; + println!("Subscribed to {event:?} events"); + } + } + let mut join_handle: Option>> = None; if let Some(play_matches) = matches.subcommand_matches("play") { let file_path = play_matches.value_of("file"); @@ -257,7 +286,11 @@ fn run() -> Result<(), Box> { .collect::>() }); - let mut play_message = if let Some(file_path) = file_path { + #[allow(unused_assignments)] + let mut url = None; + let mut content = None; + + if let Some(file_path) = file_path { match local_ip { Some(lip) => { let running = Arc::new(AtomicBool::new(true)); @@ -274,36 +307,25 @@ fn run() -> Result<(), Box> { println!("Waiting for Ctrl+C..."); let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?; - let url = result.0; + url = Some(result.0); join_handle = Some(result.1); - - PlayMessage::new(mime_type, Some(url), None, time, speed, headers) } _ => return Err("Local IP was not able to be resolved.".into()), } } else { - PlayMessage::new( - mime_type, - play_matches.value_of("url").map(|s| s.to_owned()), - play_matches.value_of("content").map(|s| s.to_owned()), - time, - speed, - headers, - ) - }; + url = play_matches.value_of("url").map(|s| s.to_owned()); + content = play_matches.value_of("content").map(|s| s.to_owned()); + } - if play_message.content.is_none() && play_message.url.is_none() { + if content.is_none() && url.is_none() { println!("Reading content from stdin..."); let mut buffer = String::new(); std::io::stdin().read_to_string(&mut buffer)?; - play_message.content = Some(buffer); + content = Some(buffer); } - let json = serde_json::to_string(&play_message); - println!("Sent play {:?}", json); - - session.send_message(Opcode::Play, &play_message)?; + session.send_play_message(mime_type, url, content, time, speed, headers)?; } else if let Some(seek_matches) = matches.subcommand_matches("seek") { let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") { Some(s) => s.parse::()?, diff --git a/senders/terminal/src/models/v3.rs b/senders/terminal/src/models/v3.rs index 1d24374..10fe448 100644 --- a/senders/terminal/src/models/v3.rs +++ b/senders/terminal/src/models/v3.rs @@ -218,6 +218,18 @@ pub enum KeyNames { Enter, } +impl KeyNames { + pub fn all() -> Vec { + vec![ + "ArrowLeft".to_owned(), + "ArrowRight".to_owned(), + "ArrowUp".to_owned(), + "ArrowDown".to_owned(), + "Enter".to_owned(), + ] + } +} + #[derive(Debug, PartialEq)] pub enum EventSubscribeObject { MediaItemStart, @@ -292,14 +304,14 @@ impl<'de> Deserialize<'de> for EventSubscribeObject { } } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] pub struct SubscribeEventMessage { - event: EventSubscribeObject, + pub event: EventSubscribeObject, } -#[derive(Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug)] pub struct UnsubscribeEventMessage { - event: EventSubscribeObject, + pub event: EventSubscribeObject, } #[derive(Debug, PartialEq, Copy, Clone)]