1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-06-24 21:25:23 +00:00

rs-terminal: support v3 PlayMessage and event subscriptions

This commit is contained in:
Marcus Hanestad 2025-06-04 12:20:25 +02:00
parent 3a7b7675ba
commit 91a2bbee13
3 changed files with 184 additions and 35 deletions

View file

@ -1,18 +1,30 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use crate::{
models::v2::PlaybackUpdateMessage,
models::{PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage},
models::{
v2::{self, PlaybackUpdateMessage},
v3, PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage,
},
transport::Transport,
};
use serde::Serialize;
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
enum ProtoVersion {
V2,
V3,
}
#[derive(Debug, PartialEq, Eq)]
enum SessionState {
Idle = 0,
Idle,
Connected(ProtoVersion),
Disconnected,
}
@ -32,7 +44,6 @@ pub enum Opcode {
Version = 11,
Ping = 12,
Pong = 13,
Initial = 14,
PlayUpdate = 15,
SetPlaylistItem = 16,
@ -58,7 +69,6 @@ impl Opcode {
11 => Opcode::Version,
12 => Opcode::Ping,
13 => Opcode::Pong,
14 => Opcode::Initial,
15 => Opcode::PlayUpdate,
16 => Opcode::SetPlaylistItem,
@ -103,10 +113,25 @@ impl<'a> FCastSession<'a> {
}
let msg: VersionMessage =
serde_json::from_str(&body.ok_or("Version messages required body".to_owned())?)?;
serde_json::from_str(&body.ok_or("Version requires body".to_owned())?)?;
if msg.version == 3 {
todo!("Send/recv initial message");
let initial = v3::InitialSenderMessage {
display_name: None,
app_name: Some(env!("CARGO_PKG_NAME").to_owned()),
app_version: Some(env!("CARGO_PKG_VERSION").to_owned()),
};
session.send_message(Opcode::Initial, initial)?;
let (opcode, body) = session.read_packet()?;
if opcode != Opcode::Initial {
return Err(format!("Expected Opcode::Initial, got {opcode:?}").into());
}
let inital_receiver: v3::InitialReceiverMessage =
serde_json::from_str(&body.ok_or("InitialReceiverMessage requires body")?)?;
dbg!(inital_receiver);
session.state = SessionState::Connected(ProtoVersion::V3);
} else {
session.state = SessionState::Connected(ProtoVersion::V2);
}
Ok(session)
@ -175,6 +200,32 @@ impl<'a> FCastSession<'a> {
Ok(())
}
pub fn subscribe(&mut self, event: v3::EventType) -> Result<(), Box<dyn std::error::Error>> {
if self.state != SessionState::Connected(ProtoVersion::V3) {
return Err(format!(
"Cannot subscribe to events in the current state ({:?})",
self.state
).into());
}
let obj = match event {
v3::EventType::MediaItemStart => v3::EventSubscribeObject::MediaItemStart,
v3::EventType::MediaItemEnd => v3::EventSubscribeObject::MediaItemEnd,
v3::EventType::MediaItemChange => v3::EventSubscribeObject::MediaItemChanged,
v3::EventType::KeyDown => v3::EventSubscribeObject::KeyDown {
keys: v3::KeyNames::all(),
},
v3::EventType::KeyUp => v3::EventSubscribeObject::KeyUp {
keys: v3::KeyNames::all(),
},
};
self.send_message(
Opcode::SubscribeEvent,
v3::SubscribeEventMessage { event: obj },
)
}
pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), Box<dyn std::error::Error>> {
let json = String::new();
let data = json.as_bytes();
@ -202,6 +253,46 @@ impl<'a> FCastSession<'a> {
Ok(())
}
pub fn send_play_message(
&mut self,
mime_type: String,
url: Option<String>,
content: Option<String>,
time: Option<f64>,
speed: Option<f64>,
headers: Option<HashMap<String, String>>,
) -> Result<(), Box<dyn std::error::Error>> {
match self.state {
SessionState::Connected(ProtoVersion::V2) => {
let msg = v2::PlayMessage {
container: mime_type,
url,
content,
time,
speed,
headers,
};
self.send_message(Opcode::Play, msg)?;
}
SessionState::Connected(ProtoVersion::V3) => {
let msg = v3::PlayMessage {
container: mime_type,
url,
content,
time,
volume: Some(1.0),
speed,
headers,
metadata: None,
};
self.send_message(Opcode::Play, msg)?;
}
_ => return Err("invalid state for sending play message".into()),
}
Ok(())
}
fn handle_packet(
&mut self,
opcode: Opcode,
@ -267,6 +358,30 @@ impl<'a> FCastSession<'a> {
self.send_empty(Opcode::Pong)?;
println!("Sent pong");
}
Opcode::PlayUpdate => {
if let Some(body_str) = body {
if let Ok(play_update_msg) =
serde_json::from_str::<v3::PlayUpdateMessage>(&body_str)
{
println!("Received play update {play_update_msg:?}");
} else {
println!("Received play update with malformed body.");
}
} else {
println!("Received play update with no body.");
}
}
Opcode::Event => {
if let Some(body_str) = body {
if let Ok(event_msg) = serde_json::from_str::<v3::EventMessage>(&body_str) {
println!("Received event {event_msg:?}");
} else {
println!("Received event with malformed body.");
}
} else {
println!("Received event with no body.");
}
}
_ => {
println!("Error handling packet");
}

View file

@ -1,4 +1,5 @@
use clap::{App, Arg, SubCommand};
use fcast::models::v3;
use fcast::transport::WebSocket;
use std::collections::HashMap;
use std::net::IpAddr;
@ -17,7 +18,6 @@ use url::Url;
use fcast::fcastsession::Opcode;
use fcast::{
fcastsession::FCastSession,
models::v2::PlayMessage,
models::{SeekMessage, SetSpeedMessage, SetVolumeMessage},
};
@ -59,6 +59,16 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("subscribe")
.short('s')
.long("subscribe")
.value_name("SUBSCRIPTIONS")
.help("A comma separated list of events to subscribe to (e.g. MediaItemStart,KeyDown). \
Available events: [MediaItemStart, MediaItemEnd, MediaItemChange, KeyDown, KeyUp]")
.required(false)
.takes_value(true),
)
.subcommand(
SubCommand::with_name("play")
.about("Play media")
@ -213,6 +223,25 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Connection established.");
if let Some(subscriptions) = matches.value_of("subscribe") {
let subs = subscriptions.split(',');
for sub in subs {
let event = match sub.to_lowercase().as_str() {
"mediaitemstart" => v3::EventType::MediaItemStart,
"mediaitemend" => v3::EventType::MediaItemEnd,
"mediaitemchange" => v3::EventType::MediaItemChange,
"keydown" => v3::EventType::KeyDown,
"keyup" => v3::EventType::KeyUp,
_ => {
println!("Invalid event in subscriptions list: {sub}");
continue;
}
};
session.subscribe(event)?;
println!("Subscribed to {event:?} events");
}
}
let mut join_handle: Option<JoinHandle<Result<(), String>>> = None;
if let Some(play_matches) = matches.subcommand_matches("play") {
let file_path = play_matches.value_of("file");
@ -257,7 +286,11 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
.collect::<HashMap<String, String>>()
});
let mut play_message = if let Some(file_path) = file_path {
#[allow(unused_assignments)]
let mut url = None;
let mut content = None;
if let Some(file_path) = file_path {
match local_ip {
Some(lip) => {
let running = Arc::new(AtomicBool::new(true));
@ -274,36 +307,25 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Waiting for Ctrl+C...");
let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?;
let url = result.0;
url = Some(result.0);
join_handle = Some(result.1);
PlayMessage::new(mime_type, Some(url), None, time, speed, headers)
}
_ => return Err("Local IP was not able to be resolved.".into()),
}
} else {
PlayMessage::new(
mime_type,
play_matches.value_of("url").map(|s| s.to_owned()),
play_matches.value_of("content").map(|s| s.to_owned()),
time,
speed,
headers,
)
};
url = play_matches.value_of("url").map(|s| s.to_owned());
content = play_matches.value_of("content").map(|s| s.to_owned());
}
if play_message.content.is_none() && play_message.url.is_none() {
if content.is_none() && url.is_none() {
println!("Reading content from stdin...");
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
play_message.content = Some(buffer);
content = Some(buffer);
}
let json = serde_json::to_string(&play_message);
println!("Sent play {:?}", json);
session.send_message(Opcode::Play, &play_message)?;
session.send_play_message(mime_type, url, content, time, speed, headers)?;
} else if let Some(seek_matches) = matches.subcommand_matches("seek") {
let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>()?,

View file

@ -218,6 +218,18 @@ pub enum KeyNames {
Enter,
}
impl KeyNames {
pub fn all() -> Vec<String> {
vec![
"ArrowLeft".to_owned(),
"ArrowRight".to_owned(),
"ArrowUp".to_owned(),
"ArrowDown".to_owned(),
"Enter".to_owned(),
]
}
}
#[derive(Debug, PartialEq)]
pub enum EventSubscribeObject {
MediaItemStart,
@ -292,14 +304,14 @@ impl<'de> Deserialize<'de> for EventSubscribeObject {
}
}
#[derive(Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct SubscribeEventMessage {
event: EventSubscribeObject,
pub event: EventSubscribeObject,
}
#[derive(Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct UnsubscribeEventMessage {
event: EventSubscribeObject,
pub event: EventSubscribeObject,
}
#[derive(Debug, PartialEq, Copy, Clone)]