1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-06-24 21:25:23 +00:00

rs-terminal: init protocol v3 support

This commit is contained in:
Marcus Hanestad 2025-06-03 16:39:09 +02:00
parent a83f92d874
commit 6529d91eb2
10 changed files with 1165 additions and 462 deletions

View file

@ -12,4 +12,5 @@ serde_json = "1.0"
ctrlc = "3.1.9"
tungstenite = { version = "0.21.0" }
url = "2.5.0"
tiny_http = "0.12.0"
tiny_http = "0.12.0"
serde_repr = "0.1.20"

View file

@ -1,13 +1,18 @@
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use crate::{models::{PlaybackUpdateMessage, VolumeUpdateMessage, PlaybackErrorMessage, VersionMessage}, transport::Transport};
use crate::{
models::v2::PlaybackUpdateMessage,
models::{PlaybackErrorMessage, VersionMessage, VolumeUpdateMessage},
transport::Transport,
};
use serde::Serialize;
#[derive(Debug)]
enum SessionState {
Idle = 0,
WaitingForLength,
WaitingForData,
Disconnected,
}
@ -26,7 +31,14 @@ pub enum Opcode {
SetSpeed = 10,
Version = 11,
Ping = 12,
Pong = 13
Pong = 13,
Initial = 14,
PlayUpdate = 15,
SetPlaylistItem = 16,
SubscribeEvent = 17,
UnsubscribeEvent = 18,
Event = 19,
}
impl Opcode {
@ -46,6 +58,13 @@ impl Opcode {
11 => Opcode::Version,
12 => Opcode::Ping,
13 => Opcode::Pong,
14 => Opcode::Initial,
15 => Opcode::PlayUpdate,
16 => Opcode::SetPlaylistItem,
17 => Opcode::SubscribeEvent,
18 => Opcode::UnsubscribeEvent,
19 => Opcode::Event,
_ => panic!("Unknown value: {}", value),
}
}
@ -56,26 +75,85 @@ const MAXIMUM_PACKET_LENGTH: usize = 32000;
pub struct FCastSession<'a> {
buffer: Vec<u8>,
bytes_read: usize,
packet_length: usize,
stream: Box<dyn Transport + 'a>,
state: SessionState
state: SessionState,
}
impl<'a> FCastSession<'a> {
pub fn new<T: Transport + 'a>(stream: T) -> Self {
return FCastSession {
Self {
buffer: vec![0; MAXIMUM_PACKET_LENGTH],
bytes_read: 0,
packet_length: 0,
stream: Box::new(stream),
state: SessionState::Idle
state: SessionState::Idle,
}
}
}
impl FCastSession<'_> {
pub fn send_message<T: Serialize>(&mut self, opcode: Opcode, message: T) -> Result<(), Box<dyn std::error::Error>> {
pub fn connect<T: Transport + 'a>(stream: T) -> Result<Self, Box<dyn std::error::Error>> {
let mut session = Self::new(stream);
session.send_message(
Opcode::Version,
crate::models::VersionMessage { version: 3 },
)?;
let (opcode, body) = session.read_packet()?;
if opcode != Opcode::Version {
return Err(format!("Expected Opcode::Version, got {opcode:?}").into());
}
let msg: VersionMessage =
serde_json::from_str(&body.ok_or("Version messages required body".to_owned())?)?;
if msg.version == 3 {
todo!("Send/recv initial message");
}
Ok(session)
}
fn read_packet(&mut self) -> Result<(Opcode, Option<String>), Box<dyn std::error::Error>> {
let mut header_buf = [0u8; 5];
self.stream.transport_read_exact(&mut header_buf)?;
let opcode = Opcode::from_u8(header_buf[4]);
let body_length =
u32::from_le_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]])
as usize
- 1;
if body_length > MAXIMUM_PACKET_LENGTH {
println!(
"Maximum packet length is 32kB, killing stream: {}",
body_length,
);
self.stream.transport_shutdown()?;
self.state = SessionState::Disconnected;
return Err(format!(
"Stream killed due to packet length ({}) exceeding maximum 32kB packet size.",
body_length,
)
.into());
}
self.stream
.transport_read_exact(&mut self.buffer[0..body_length])?;
let body_json = if body_length > 0 {
Some(String::from_utf8(self.buffer[0..body_length].to_vec())?)
} else {
None
};
Ok((opcode, body_json))
}
pub fn send_message<T: Serialize>(
&mut self,
opcode: Opcode,
message: T,
) -> Result<(), Box<dyn std::error::Error>> {
let json = serde_json::to_string(&message)?;
let data = json.as_bytes();
let size = 1 + data.len();
@ -83,9 +161,16 @@ impl FCastSession<'_> {
let mut header = vec![0u8; header_size];
header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes());
header[LENGTH_BYTES] = opcode as u8;
let packet = [header, data.to_vec()].concat();
println!("Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).", packet.len(), opcode, header_size, data.len(), json);
println!(
"Sent {} bytes with (opcode: {:?}, header size: {}, body size: {}, body: {}).",
packet.len(),
opcode,
header_size,
data.len(),
json
);
self.stream.transport_write(&packet)?;
Ok(())
}
@ -97,126 +182,39 @@ impl FCastSession<'_> {
let mut header = vec![0u8; LENGTH_BYTES + 1];
header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes());
header[LENGTH_BYTES] = opcode as u8;
let packet = [header, data.to_vec()].concat();
self.stream.transport_write(&packet)?;
Ok(())
}
pub fn receive_loop(&mut self, running: &Arc<AtomicBool>) -> Result<(), Box<dyn std::error::Error>> {
pub fn receive_loop(
&mut self,
running: &Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Start receiving.");
self.state = SessionState::WaitingForLength;
let mut buffer = [0u8; 1024];
while running.load(Ordering::SeqCst) {
let bytes_read = self.stream.transport_read(&mut buffer)?;
self.process_bytes(&buffer[..bytes_read])?;
}
self.state = SessionState::Idle;
Ok(())
}
fn process_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
if received_bytes.is_empty() {
return Ok(());
}
println!("{} bytes received", received_bytes.len());
match self.state {
SessionState::WaitingForLength => self.handle_length_bytes(received_bytes)?,
SessionState::WaitingForData => self.handle_packet_bytes(received_bytes)?,
_ => println!("Data received is unhandled in current session state {:?}", self.state),
let (opcode, body) = self.read_packet()?;
self.handle_packet(opcode, body)?;
}
Ok(())
}
fn handle_length_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let bytes_to_read = std::cmp::min(LENGTH_BYTES, received_bytes.len());
let bytes_remaining = received_bytes.len() - bytes_to_read;
self.buffer[self.bytes_read..self.bytes_read + bytes_to_read]
.copy_from_slice(&received_bytes[..bytes_to_read]);
self.bytes_read += bytes_to_read;
println!("handleLengthBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= LENGTH_BYTES {
self.state = SessionState::WaitingForData;
self.packet_length = u32::from_le_bytes(self.buffer[..LENGTH_BYTES].try_into()?) as usize;
self.bytes_read = 0;
println!("Packet length header received from: {}", self.packet_length);
if self.packet_length > MAXIMUM_PACKET_LENGTH {
println!("Maximum packet length is 32kB, killing stream: {}", self.packet_length);
self.stream.transport_shutdown()?;
self.state = SessionState::Disconnected;
return Err(format!("Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", self.packet_length).into());
}
if bytes_remaining > 0 {
println!("{} remaining bytes pushed to handlePacketBytes", bytes_remaining);
self.handle_packet_bytes(&received_bytes[bytes_to_read..])?;
}
}
Ok(())
}
fn handle_packet_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let bytes_to_read = std::cmp::min(self.packet_length, received_bytes.len());
let bytes_remaining = received_bytes.len() - bytes_to_read;
self.buffer[self.bytes_read..self.bytes_read + bytes_to_read]
.copy_from_slice(&received_bytes[..bytes_to_read]);
self.bytes_read += bytes_to_read;
println!("handlePacketBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= self.packet_length {
println!("Packet finished receiving of {} bytes.", self.packet_length);
self.handle_next_packet()?;
self.state = SessionState::WaitingForLength;
self.packet_length = 0;
self.bytes_read = 0;
if bytes_remaining > 0 {
println!("{} remaining bytes pushed to handleLengthBytes", bytes_remaining);
self.handle_length_bytes(&received_bytes[bytes_to_read..])?;
}
}
Ok(())
}
fn handle_next_packet(&mut self) -> Result<(), Box<dyn std::error::Error>> {
println!("Processing packet of {} bytes", self.bytes_read);
let opcode = Opcode::from_u8(self.buffer[0]);
let packet_length = self.packet_length;
let body = if packet_length > 1 {
Some(std::str::from_utf8(&self.buffer[1..packet_length])?.to_string())
} else {
None
};
println!("Received body: {:?}", body);
self.handle_packet(opcode, body)
}
fn handle_packet(&mut self, opcode: Opcode, body: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
fn handle_packet(
&mut self,
opcode: Opcode,
body: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Received message with opcode {:?}.", opcode);
match opcode {
Opcode::PlaybackUpdate => {
if let Some(body_str) = body {
if let Ok(playback_update_msg) = serde_json::from_str::<PlaybackUpdateMessage>(body_str.as_str()) {
if let Ok(playback_update_msg) =
serde_json::from_str::<PlaybackUpdateMessage>(body_str.as_str())
{
println!("Received playback update {:?}", playback_update_msg);
} else {
println!("Received playback update with malformed body.");
@ -227,7 +225,9 @@ impl FCastSession<'_> {
}
Opcode::VolumeUpdate => {
if let Some(body_str) = body {
if let Ok(volume_update_msg) = serde_json::from_str::<VolumeUpdateMessage>(body_str.as_str()) {
if let Ok(volume_update_msg) =
serde_json::from_str::<VolumeUpdateMessage>(body_str.as_str())
{
println!("Received volume update {:?}", volume_update_msg);
} else {
println!("Received volume update with malformed body.");
@ -238,7 +238,9 @@ impl FCastSession<'_> {
}
Opcode::PlaybackError => {
if let Some(body_str) = body {
if let Ok(playback_error_msg) = serde_json::from_str::<PlaybackErrorMessage>(body_str.as_str()) {
if let Ok(playback_error_msg) =
serde_json::from_str::<PlaybackErrorMessage>(body_str.as_str())
{
println!("Received playback error {:?}", playback_error_msg);
} else {
println!("Received playback error with malformed body.");
@ -249,7 +251,9 @@ impl FCastSession<'_> {
}
Opcode::Version => {
if let Some(body_str) = body {
if let Ok(version_msg) = serde_json::from_str::<VersionMessage>(body_str.as_str()) {
if let Ok(version_msg) =
serde_json::from_str::<VersionMessage>(body_str.as_str())
{
println!("Received version {:?}", version_msg);
} else {
println!("Received version with malformed body.");
@ -272,6 +276,6 @@ impl FCastSession<'_> {
}
pub fn shutdown(&mut self) -> Result<(), std::io::Error> {
return self.stream.transport_shutdown();
self.stream.transport_shutdown()
}
}
}

View file

@ -1,36 +0,0 @@
struct FileServer {
base_url: String,
base_path: String,
}
impl FileServer {
fn new(base_url: String, base_path: String) -> Self {
FileServer { base_url, base_path }
}
async fn serve(&self) {
let file_server = warp::fs::dir(&self.base_path);
warp::serve(file_server).run(([127, 0, 0, 1], 3030)).await;
}
fn get_url(&self, file_name: &str) -> String {
format!("{}/{}", self.base_url, file_name)
}
}
pub async fn host_file_and_get_url(file_path: &str) -> Result<String, Box<dyn std::error::Error>> {
let file_name = Path::new(file_path).file_name().ok_or("Invalid file path")?.to_str().ok_or("Invalid file name")?;
let file_server = FileServer::new("http://127.0.0.1:3030".to_string(), "path/to/hosted/files".to_string());
// Copy the file to the hosting directory
let destination = Path::new(&file_server.base_path).join(file_name);
tokio::fs::copy(file_path, &destination).await?;
// Start the server if not already running
// This part needs synchronization in a real-world scenario
tokio::spawn(async move {
file_server.serve().await;
});
Ok(file_server.get_url(file_name))
}

View file

@ -0,0 +1,3 @@
pub mod fcastsession;
pub mod models;
pub mod transport;

View file

@ -1,27 +1,26 @@
mod models;
mod fcastsession;
mod transport;
use clap::{App, Arg, SubCommand};
use tiny_http::{Server, Response, ListenAddr, Header};
use tungstenite::stream::MaybeTlsStream;
use url::Url;
use std::collections::HashMap;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::{thread, fs};
use std::time::Instant;
use std::{fs, thread};
use std::{io::Read, net::TcpStream};
use std::sync::atomic::{ AtomicBool, Ordering};
use std::{sync::Arc, time::Duration};
use tiny_http::{Header, ListenAddr, Response, Server};
use tungstenite::stream::MaybeTlsStream;
use url::Url;
use crate::fcastsession::Opcode;
use crate::models::{SetVolumeMessage, SetSpeedMessage};
use crate::{models::{PlayMessage, SeekMessage}, fcastsession::FCastSession};
use fcast::fcastsession::Opcode;
use fcast::{
fcastsession::FCastSession,
models::v2::PlayMessage,
models::{SeekMessage, SetSpeedMessage, SetVolumeMessage},
};
fn main() {
fn main() {
if let Err(e) = run() {
println!("Failed due to error: {}", e)
}
@ -30,127 +29,151 @@ fn main() {
fn run() -> Result<(), Box<dyn std::error::Error>> {
let app = App::new("Media Control")
.about("Control media playback")
.arg(Arg::with_name("connection_type")
.short('c')
.long("connection_type")
.value_name("CONNECTION_TYPE")
.help("Type of connection: tcp or ws (websocket)")
.required(false)
.default_value("tcp")
.takes_value(true))
.arg(Arg::with_name("host")
.short('h')
.long("host")
.value_name("Host")
.help("The host address to send the command to")
.required(true)
.takes_value(true))
.arg(Arg::with_name("port")
.short('p')
.long("port")
.value_name("PORT")
.help("The port to send the command to")
.required(false)
.takes_value(true))
.subcommand(SubCommand::with_name("play")
.about("Play media")
.arg(Arg::with_name("mime_type")
.short('m')
.long("mime_type")
.value_name("MIME_TYPE")
.help("Mime type (e.g., video/mp4)")
.required_unless_present("file")
.takes_value(true)
)
.arg(Arg::with_name("file")
.short('f')
.long("file")
.value_name("File")
.help("File content to play")
.required(false)
.takes_value(true))
.arg(Arg::with_name("url")
.short('u')
.long("url")
.value_name("URL")
.help("URL to the content")
.required(false)
.takes_value(true)
)
.arg(Arg::with_name("content")
.arg(
Arg::with_name("connection_type")
.short('c')
.long("content")
.value_name("CONTENT")
.help("The actual content")
.long("connection_type")
.value_name("CONNECTION_TYPE")
.help("Type of connection: tcp or ws (websocket)")
.required(false)
.takes_value(true)
)
.arg(Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(false)
.default_value("0")
.takes_value(true)
)
.arg(Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(false)
.default_value("1")
.takes_value(true)
)
.arg(Arg::with_name("header")
.short('H')
.long("header")
.value_name("HEADER")
.help("Custom request headers in key:value format")
.required(false)
.multiple_occurrences(true)
)
.default_value("tcp")
.takes_value(true),
)
.subcommand(SubCommand::with_name("seek")
.about("Seek to a timestamp")
.arg(Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(true)
.takes_value(true)
),
.arg(
Arg::with_name("host")
.short('h')
.long("host")
.value_name("Host")
.help("The host address to send the command to")
.default_value("127.0.0.1")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("port")
.short('p')
.long("port")
.value_name("PORT")
.help("The port to send the command to")
.required(false)
.takes_value(true),
)
.subcommand(
SubCommand::with_name("play")
.about("Play media")
.arg(
Arg::with_name("mime_type")
.short('m')
.long("mime_type")
.value_name("MIME_TYPE")
.help("Mime type (e.g., video/mp4)")
.required_unless_present("file")
.takes_value(true),
)
.arg(
Arg::with_name("file")
.short('f')
.long("file")
.value_name("File")
.help("File content to play")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("url")
.short('u')
.long("url")
.value_name("URL")
.help("URL to the content")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("content")
.short('c')
.long("content")
.value_name("CONTENT")
.help("The actual content")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(false)
.default_value("0")
.takes_value(true),
)
.arg(
Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(false)
.default_value("1")
.takes_value(true),
)
.arg(
Arg::with_name("header")
.short('H')
.long("header")
.value_name("HEADER")
.help("Custom request headers in key:value format")
.required(false)
.multiple_occurrences(true),
),
)
.subcommand(
SubCommand::with_name("seek")
.about("Seek to a timestamp")
.arg(
Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(true)
.takes_value(true),
),
)
.subcommand(SubCommand::with_name("pause").about("Pause media"))
.subcommand(SubCommand::with_name("resume").about("Resume media"))
.subcommand(SubCommand::with_name("stop").about("Stop media"))
.subcommand(SubCommand::with_name("listen").about("Listen to incoming events"))
.subcommand(SubCommand::with_name("setvolume").about("Set the volume")
.arg(Arg::with_name("volume")
.short('v')
.long("volume")
.value_name("VOLUME")
.help("Volume level (0-1)")
.required(true)
.takes_value(true)))
.subcommand(SubCommand::with_name("setspeed").about("Set the playback speed")
.arg(Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(true)
.takes_value(true))
.subcommand(
SubCommand::with_name("setvolume")
.about("Set the volume")
.arg(
Arg::with_name("volume")
.short('v')
.long("volume")
.value_name("VOLUME")
.help("Volume level (0-1)")
.required(true)
.takes_value(true),
),
)
.subcommand(
SubCommand::with_name("setspeed")
.about("Set the playback speed")
.arg(
Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(true)
.takes_value(true),
),
);
let matches = app.get_matches();
let host = match matches.value_of("host") {
Some(s) => s,
_ => return Err("Host is required.".into())
};
let host = matches.value_of("host").expect("host has default value");
let connection_type = matches.value_of("connection_type").unwrap_or("tcp");
@ -159,8 +182,10 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
_ => match connection_type {
"tcp" => "46899",
"ws" => "46898",
_ => return Err("Unknown connection type, cannot automatically determine port.".into())
}
_ => {
return Err("Unknown connection type, cannot automatically determine port.".into())
}
},
};
let local_ip: Option<IpAddr>;
@ -169,24 +194,23 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Connecting via TCP to host={} port={}...", host, port);
let stream = TcpStream::connect(format!("{}:{}", host, port))?;
local_ip = Some(stream.local_addr()?.ip());
FCastSession::new(stream)
},
FCastSession::connect(stream)?
}
"ws" => {
println!("Connecting via WebSocket to host={} port={}...", host, port);
let url = Url::parse(format!("ws://{}:{}", host, port).as_str())?;
let (stream, _) = tungstenite::connect(url)?;
local_ip = match stream.get_ref() {
MaybeTlsStream::Plain(ref stream) => Some(stream.local_addr()?.ip()),
_ => return Err("Established connection type is not plain.".into())
_ => return Err("Established connection type is not plain.".into()),
};
FCastSession::new(stream)
FCastSession::connect(stream)?
}
_ => return Err("Invalid connection type.".into()),
};
println!("Connection established.");
let mut join_handle: Option<JoinHandle<Result<(), String>>> = None;
if let Some(play_matches) = matches.subcommand_matches("play") {
let file_path = play_matches.value_of("file");
@ -197,7 +221,7 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
if file_path.is_none() {
return Err("MIME type is required.".into());
}
match file_path.unwrap().split('.').last() {
match file_path.unwrap().split('.').next_back() {
Some("mkv") => "video/x-matroska".to_string(),
Some("mov") => "video/quicktime".to_string(),
Some("mp4") | Some("m4v") => "video/mp4".to_string(),
@ -210,68 +234,59 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
let time = match play_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>().ok(),
_ => None
_ => None,
};
let speed = match play_matches.value_of("speed") {
Some(s) => s.parse::<f64>().ok(),
_ => None
_ => None,
};
let headers = play_matches.values_of("header")
.map(|values| values
.filter_map(|s| {
let mut parts = s.splitn(2, ':');
if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
Some((key.trim().to_string(), value.trim().to_string()))
} else {
None
}
}
).collect::<HashMap<String, String>>());
let headers = play_matches.values_of("header").map(|values| {
values
.filter_map(|s| {
let mut parts = s.splitn(2, ':');
if let (Some(key), Some(value)) = (parts.next(), parts.next()) {
Some((key.trim().to_string(), value.trim().to_string()))
} else {
None
}
})
.collect::<HashMap<String, String>>()
});
let mut play_message = if let Some(file_path) = file_path {
match local_ip {
Some(lip) => {
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
let r = running.clone();
ctrlc::set_handler(move || {
println!("Ctrl+C triggered, server will stop when onging request finishes...");
println!(
"Ctrl+C triggered, server will stop when onging request finishes..."
);
r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
})
.expect("Error setting Ctrl-C handler");
println!("Waiting for Ctrl+C...");
let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?;
let url = result.0;
join_handle = Some(result.1);
PlayMessage::new(
mime_type,
Some(url),
None,
time,
speed,
headers
)
},
_ => return Err("Local IP was not able to be resolved.".into())
PlayMessage::new(mime_type, Some(url), None, time, speed, headers)
}
_ => return Err("Local IP was not able to be resolved.".into()),
}
} else {
PlayMessage::new(
mime_type,
match play_matches.value_of("url") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("content") {
Some(s) => Some(s.to_string()),
_ => None
},
play_matches.value_of("url").map(|s| s.to_owned()),
play_matches.value_of("content").map(|s| s.to_owned()),
time,
speed,
headers
headers,
)
};
@ -290,20 +305,20 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
} else if let Some(seek_matches) = matches.subcommand_matches("seek") {
let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>()?,
_ => return Err("Timestamp is required.".into())
_ => return Err("Timestamp is required.".into()),
});
println!("Sent seek {:?}", seek_message);
session.send_message(Opcode::Seek, &seek_message)?;
} else if let Some(_) = matches.subcommand_matches("pause") {
} else if matches.subcommand_matches("pause").is_some() {
println!("Sent pause");
session.send_empty(Opcode::Pause)?;
} else if let Some(_) = matches.subcommand_matches("resume") {
} else if matches.subcommand_matches("resume").is_some() {
println!("Sent resume");
session.send_empty(Opcode::Resume)?;
} else if let Some(_) = matches.subcommand_matches("stop") {
} else if matches.subcommand_matches("stop").is_some() {
println!("Sent stop");
session.send_empty(Opcode::Stop)?;
} else if let Some(_) = matches.subcommand_matches("listen") {
} else if matches.subcommand_matches("listen").is_some() {
println!("Starter listening to events...");
let running = Arc::new(AtomicBool::new(true));
@ -312,7 +327,8 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
ctrlc::set_handler(move || {
println!("Ctrl+C triggered...");
r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
})
.expect("Error setting Ctrl-C handler");
println!("Waiting for Ctrl+C...");
@ -322,14 +338,14 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
} else if let Some(setvolume_matches) = matches.subcommand_matches("setvolume") {
let setvolume_message = SetVolumeMessage::new(match setvolume_matches.value_of("volume") {
Some(s) => s.parse::<f64>()?,
_ => return Err("Timestamp is required.".into())
_ => return Err("Timestamp is required.".into()),
});
println!("Sent setvolume {:?}", setvolume_message);
session.send_message(Opcode::SetVolume, &setvolume_message)?;
} else if let Some(setspeed_matches) = matches.subcommand_matches("setspeed") {
let setspeed_message = SetSpeedMessage::new(match setspeed_matches.value_of("speed") {
Some(s) => s.parse::<f64>()?,
_ => return Err("Speed is required.".into())
_ => return Err("Speed is required.".into()),
});
println!("Sent setspeed {:?}", setspeed_message);
session.send_message(Opcode::SetSpeed, &setspeed_message)?;
@ -340,7 +356,7 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
println!("Waiting on other threads...");
if let Some(v) = join_handle {
if let Err(_) = v.join() {
if v.join().is_err() {
return Err("Failed to join thread.".into());
}
}
@ -364,19 +380,19 @@ impl ServerState {
}
}
fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, running: &Arc<AtomicBool>) -> Result<(String, thread::JoinHandle<Result<(), String>>), String> {
fn host_file_and_get_url(
local_ip: &IpAddr,
file_path: &str,
mime_type: &String,
running: &Arc<AtomicBool>,
) -> Result<(String, thread::JoinHandle<Result<(), String>>), String> {
let local_ip_str = if local_ip.is_ipv6() {
format!("[{}]", local_ip)
} else {
format!("{}", local_ip)
};
let server = {
let this = Server::http(format!("{local_ip_str}:0"));
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Failed to create server: {}", e))(e)),
}
}?;
let server = Server::http(format!("{local_ip_str}:0"))
.map_err(|err| format!("Failed to create server: {err}"))?;
let url = match server.server_addr() {
ListenAddr::IP(addr) => format!("http://{local_ip_str}:{}/", addr.port()),
@ -399,14 +415,9 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String,
}
let should_break = {
let state = {
let this = state.lock();
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Mutex error: {}", e))(e)),
}
}?;
state.active_connections == 0 && state.last_request_time.elapsed() > Duration::from_secs(300)
let state = state.lock().unwrap();
state.active_connections == 0
&& state.last_request_time.elapsed() > Duration::from_secs(300)
};
if should_break {
@ -418,34 +429,18 @@ fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String,
Ok(Some(request)) => {
println!("Request received.");
let mut state = {
let this = state.lock();
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Mutex error: {}", e))(e)),
}
}?;
let mut state = state.lock().unwrap();
state.active_connections += 1;
state.last_request_time = Instant::now();
let file = {
let this = fs::File::open(&file_path_clone);
match this {
Ok(t) => Ok(t),
Err(e) => Err((|_| "Failed to open file.".to_string())(e)),
}
}?;
let file = fs::File::open(&file_path_clone)
.map_err(|_| "Failed to open file.".to_owned())?;
let content_type_header = {
let this = Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str());
match this {
Ok(t) => Ok(t),
Err(e) => Err((|_| "Failed to open file.".to_string())(e)),
}
}?;
let content_type_header =
Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str())
.map_err(|_| "Failed to open file.".to_owned())?;
let response = Response::from_file(file)
.with_header(content_type_header);
let response = Response::from_file(file).with_header(content_type_header);
if let Err(e) = request.respond(response) {
println!("Failed to respond to request: {}", e);

View file

@ -1,79 +0,0 @@
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Debug)]
pub struct PlayMessage {
pub container: String,
pub url: Option<String>,
pub content: Option<String>,
pub time: Option<f64>,
pub speed: Option<f64>,
pub headers: Option<HashMap<String, String>>
}
impl PlayMessage {
pub fn new(container: String, url: Option<String>, content: Option<String>, time: Option<f64>, speed: Option<f64>, headers: Option<HashMap<String, String>>) -> Self {
Self { container, url, content, time, speed, headers }
}
}
#[derive(Serialize, Debug)]
pub struct SeekMessage {
pub time: f64,
}
impl SeekMessage {
pub fn new(time: f64) -> Self {
Self { time }
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub time: f64,
pub duration: f64,
pub speed: f64,
pub state: u8 //0 = None, 1 = Playing, 2 = Paused
}
#[derive(Deserialize, Debug)]
pub struct VolumeUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub volume: f64 //(0-1)
}
#[derive(Serialize, Debug)]
pub struct SetVolumeMessage {
pub volume: f64,
}
impl SetVolumeMessage {
pub fn new(volume: f64) -> Self {
Self { volume }
}
}
#[derive(Serialize, Debug)]
pub struct SetSpeedMessage {
pub speed: f64,
}
impl SetSpeedMessage {
pub fn new(speed: f64) -> Self {
Self { speed }
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackErrorMessage {
pub message: String,
}
#[derive(Deserialize, Debug)]
pub struct VersionMessage {
pub version: u64,
}

View file

@ -0,0 +1,54 @@
use serde::{Deserialize, Serialize};
pub mod v2;
pub mod v3;
#[derive(Deserialize, Debug)]
pub struct PlaybackErrorMessage {
pub message: String,
}
#[derive(Deserialize, Serialize, Debug)]
pub struct VersionMessage {
pub version: u64,
}
#[derive(Serialize, Debug)]
pub struct SetSpeedMessage {
pub speed: f64,
}
impl SetSpeedMessage {
pub fn new(speed: f64) -> Self {
Self { speed }
}
}
#[derive(Deserialize, Debug)]
pub struct VolumeUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub volume: f64, //(0-1)
}
#[derive(Serialize, Debug)]
pub struct SetVolumeMessage {
pub volume: f64,
}
impl SetVolumeMessage {
pub fn new(volume: f64) -> Self {
Self { volume }
}
}
#[derive(Serialize, Debug)]
pub struct SeekMessage {
pub time: f64,
}
impl SeekMessage {
pub fn new(time: f64) -> Self {
Self { time }
}
}

View file

@ -0,0 +1,43 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Debug)]
pub struct PlayMessage {
pub container: String,
pub url: Option<String>,
pub content: Option<String>,
pub time: Option<f64>,
pub speed: Option<f64>,
pub headers: Option<HashMap<String, String>>,
}
impl PlayMessage {
pub fn new(
container: String,
url: Option<String>,
content: Option<String>,
time: Option<f64>,
speed: Option<f64>,
headers: Option<HashMap<String, String>>,
) -> Self {
Self {
container,
url,
content,
time,
speed,
headers,
}
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
pub time: f64,
pub duration: f64,
pub speed: f64,
pub state: u8, //0 = None, 1 = Playing, 2 = Paused
}

View file

@ -0,0 +1,671 @@
use std::collections::HashMap;
use serde::{de, Deserialize, Serialize};
use serde_json::{json, Value};
use serde_repr::{Deserialize_repr, Serialize_repr};
macro_rules! get_from_map {
($map:expr, $key:expr) => {
$map.get($key).ok_or(de::Error::missing_field($key))
};
}
#[derive(Debug, PartialEq, Clone)]
pub enum MetadataObject {
Generic {
title: Option<String>,
thumbnail_url: Option<String>,
custom: Value,
},
}
impl Serialize for MetadataObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
MetadataObject::Generic {
title,
thumbnail_url,
custom,
} => {
let mut map = serde_json::Map::new();
map.insert("type".to_owned(), json!(0u64));
map.insert(
"title".to_owned(),
match title {
Some(t) => Value::String(t.to_owned()),
None => Value::Null,
},
);
map.insert(
"thumbnailUrl".to_owned(),
match thumbnail_url {
Some(t) => Value::String(t.to_owned()),
None => Value::Null,
},
);
map.insert("custom".to_owned(), custom.clone());
map.serialize(serializer)
}
}
}
}
// TODO: handle errors
impl<'de> Deserialize<'de> for MetadataObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut map = serde_json::Map::deserialize(deserializer)?;
let type_ = map
.remove("type")
.ok_or(de::Error::missing_field("type"))?
.as_u64()
.ok_or(de::Error::custom("`type` is not an integer"))?;
let rest = Value::Object(map);
match type_ {
0 => Ok(Self::Generic {
title: rest.get("title").map(|v| v.as_str().unwrap().to_owned()),
thumbnail_url: rest
.get("thumbnailUrl")
.map(|v| v.as_str().unwrap().to_owned()),
custom: rest.get("custom").unwrap().clone(),
}),
_ => Err(de::Error::custom(format!("Unknown metadata type {type_}"))),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PlayMessage {
/// The MIME type (video/mp4)
pub container: String,
// The URL to load (optional)
pub url: Option<String>,
// The content to load (i.e. a DASH manifest, json content, optional)
pub content: Option<String>,
// The time to start playing in seconds
pub time: Option<f64>,
// The desired volume (0-1)
pub volume: Option<f64>,
// The factor to multiply playback speed by (defaults to 1.0)
pub speed: Option<f64>,
// HTTP request headers to add to the play request Map<string, string>
pub headers: Option<HashMap<String, String>>,
pub metadata: Option<MetadataObject>,
}
#[derive(Deserialize_repr, Serialize_repr, Debug)]
#[repr(u8)]
pub enum ContentType {
Playlist = 0,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct MediaItem {
/// The MIME type (video/mp4)
pub container: String,
/// The URL to load (optional)
pub url: Option<String>,
/// The content to load (i.e. a DASH manifest, json content, optional)
pub content: Option<String>,
/// The time to start playing in seconds
pub time: Option<f64>,
/// The desired volume (0-1)
pub volume: Option<f64>,
/// The factor to multiply playback speed by (defaults to 1.0)
pub speed: Option<f64>,
/// Indicates if the receiver should preload the media item
pub cache: Option<bool>,
/// Indicates how long the item content is presented on screen in seconds
pub showDuration: Option<f64>,
/// HTTP request headers to add to the play request Map<string, string>
pub headers: Option<HashMap<String, String>>,
pub metadata: Option<MetadataObject>,
}
#[derive(Serialize, Debug)]
pub struct PlaylistContent {
#[serde(rename = "type")]
variant: ContentType,
pub items: Vec<MediaItem>,
/// Start position of the first item to play from the playlist
pub offset: Option<u64>, // int or float?
/// The desired volume (0-1)
pub volume: Option<f64>,
/// The factor to multiply playback speed by (defaults to 1.0)
pub speed: Option<f64>,
/// Count of media items should be pre-loaded forward from the current view index
#[serde(rename = "forwardCache")]
pub forward_cache: Option<u64>,
/// Count of media items should be pre-loaded backward from the current view index
#[serde(rename = "backwardCache")]
pub backward_cache: Option<u64>,
pub metadata: Option<MetadataObject>,
}
#[derive(Serialize_repr, Debug)]
#[repr(u8)]
pub enum PlaybackState {
Idle = 0,
Playing = 1,
Paused = 2,
}
#[derive(Serialize, Debug)]
pub struct PlaybackUpdateMessage {
// The time the packet was generated (unix time milliseconds)
pub generationTime: u64,
// The playback state
pub state: PlaybackState,
// The current time playing in seconds
pub time: Option<f64>,
// The duration in seconds
pub duration: Option<f64>,
// The playback speed factor
pub speed: Option<f64>,
// The playlist item index currently being played on receiver
pub itemIndex: Option<u64>,
}
#[derive(Serialize, Debug)]
pub struct InitialSenderMessage {
#[serde(rename = "displayName")]
pub display_name: Option<String>,
#[serde(rename = "appName")]
pub app_name: Option<String>,
#[serde(rename = "appVersion")]
pub app_version: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct InitialReceiverMessage {
#[serde(rename = "displayName")]
pub display_name: Option<String>,
#[serde(rename = "appName")]
pub app_name: Option<String>,
#[serde(rename = "appVersion")]
pub app_version: Option<String>,
#[serde(rename = "playData")]
pub play_data: Option<PlayMessage>,
}
#[derive(Deserialize, Debug)]
pub struct PlayUpdateMessage {
#[serde(rename = "generationTime")]
pub generation_time: Option<u64>,
#[serde(rename = "playData")]
pub play_data: Option<PlayMessage>,
}
#[derive(Serialize, Debug)]
pub struct SetPlaylistItemMessage {
#[serde(rename = "itemIndex")]
pub item_index: Option<u64>,
}
#[derive(Deserialize, Debug)]
pub enum KeyNames {
ArrowLeft,
ArrowRight,
ArrowUp,
ArrowDown,
Enter,
}
#[derive(Debug, PartialEq)]
pub enum EventSubscribeObject {
MediaItemStart,
MediaItemEnd,
MediaItemChanged,
KeyDown { keys: Vec<String> },
KeyUp { keys: Vec<String> },
}
impl Serialize for EventSubscribeObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serde_json::Map::new();
let type_val: u64 = match self {
EventSubscribeObject::MediaItemStart => 0,
EventSubscribeObject::MediaItemEnd => 1,
EventSubscribeObject::MediaItemChanged => 2,
EventSubscribeObject::KeyDown { .. } => 3,
EventSubscribeObject::KeyUp { .. } => 4,
};
map.insert("type".to_owned(), json!(type_val));
let keys = match self {
EventSubscribeObject::KeyDown { keys } => Some(keys),
EventSubscribeObject::KeyUp { keys } => Some(keys),
_ => None,
};
if let Some(keys) = keys {
map.insert("keys".to_owned(), json!(keys));
}
map.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for EventSubscribeObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut map = serde_json::Map::deserialize(deserializer)?;
let type_ = map
.remove("type")
.ok_or(de::Error::missing_field("type"))?
.as_u64()
.ok_or(de::Error::custom("`type` is not an integer"))?;
let rest = Value::Object(map);
match type_ {
0 => Ok(Self::MediaItemStart),
1 => Ok(Self::MediaItemEnd),
2 => Ok(Self::MediaItemChanged),
3 | 4 => {
let keys = get_from_map!(rest, "keys")?
.as_array()
.ok_or(de::Error::custom("`type` is not an array"))?
.iter()
.map(|v| v.as_str().map(|s| s.to_owned()))
.collect::<Option<Vec<String>>>()
.ok_or(de::Error::custom("`type` is not an array of strings"))?;
if type_ == 3 {
Ok(Self::KeyDown { keys })
} else {
Ok(Self::KeyUp { keys })
}
}
_ => Err(de::Error::custom(format!("Unknown event type {type_}"))),
}
}
}
#[derive(Deserialize, Debug)]
pub struct SubscribeEventMessage {
event: EventSubscribeObject,
}
#[derive(Deserialize, Debug)]
pub struct UnsubscribeEventMessage {
event: EventSubscribeObject,
}
#[derive(Debug, PartialEq, Copy, Clone)]
#[repr(u8)]
pub enum EventType {
MediaItemStart = 0,
MediaItemEnd = 1,
MediaItemChange = 2,
KeyDown = 3,
KeyUp = 4,
}
#[derive(Debug, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum EventObject {
MediaItem {
variant: EventType,
item: MediaItem,
},
Key {
variant: EventType,
key: String,
repeat: bool,
handled: bool,
},
}
impl Serialize for EventObject {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serde_json::Map::new();
match self {
EventObject::MediaItem { variant, item } => {
map.insert("type".to_owned(), json!(*variant as u8));
map.insert("item".to_owned(), serde_json::to_value(item).unwrap());
}
EventObject::Key {
variant,
key,
repeat,
handled,
} => {
map.insert("type".to_owned(), json!(*variant as u8));
map.insert("key".to_owned(), serde_json::to_value(key).unwrap());
map.insert("repeat".to_owned(), serde_json::to_value(repeat).unwrap());
map.insert("handled".to_owned(), serde_json::to_value(handled).unwrap());
}
}
map.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for EventObject {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
let mut map = serde_json::Map::deserialize(deserializer)?;
let type_ = map
.remove("type")
.ok_or(de::Error::missing_field("type"))?
.as_u64()
.ok_or(de::Error::custom("`type` is not an integer"))?;
let rest = Value::Object(map);
match type_ {
#[allow(clippy::manual_range_patterns)]
0 | 1 | 2 => {
let variant = match type_ {
0 => EventType::MediaItemStart,
1 => EventType::MediaItemEnd,
_ => EventType::MediaItemChange,
};
let item = get_from_map!(rest, "item")?;
Ok(Self::MediaItem {
variant,
item: MediaItem::deserialize(item).unwrap(),
})
}
3 | 4 => {
let variant = if type_ == 3 {
EventType::KeyDown
} else {
EventType::KeyUp
};
Ok(Self::Key {
variant,
key: get_from_map!(rest, "key")?.as_str().unwrap().to_owned(),
repeat: get_from_map!(rest, "repeat")?.as_bool().unwrap(),
handled: get_from_map!(rest, "handled")?.as_bool().unwrap(),
})
}
_ => Err(de::Error::custom(format!("Unknown event type {type_}"))),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EventMessage {
#[serde(rename = "generationTime")]
pub generation_time: u64,
event: EventObject,
}
#[cfg(test)]
mod tests {
use super::*;
macro_rules! s {
($s:expr) => {
($s).to_string()
};
}
#[test]
fn serialize_metadata_object() {
assert_eq!(
&serde_json::to_string(&MetadataObject::Generic {
title: Some(s!("abc")),
thumbnail_url: Some(s!("def")),
custom: serde_json::Value::Null,
})
.unwrap(),
r#"{"custom":null,"thumbnailUrl":"def","title":"abc","type":0}"#
);
assert_eq!(
&serde_json::to_string(&MetadataObject::Generic {
title: None,
thumbnail_url: None,
custom: serde_json::Value::Null,
})
.unwrap(),
r#"{"custom":null,"thumbnailUrl":null,"title":null,"type":0}"#
);
}
#[test]
fn deserialize_metadata_object() {
assert_eq!(
serde_json::from_str::<MetadataObject>(
r#"{"type":0,"title":"abc","thumbnailUrl":"def","custom":null}"#
)
.unwrap(),
MetadataObject::Generic {
title: Some(s!("abc")),
thumbnail_url: Some(s!("def")),
custom: serde_json::Value::Null,
}
);
assert_eq!(
serde_json::from_str::<MetadataObject>(r#"{"type":0,"custom":null}"#).unwrap(),
MetadataObject::Generic {
title: None,
thumbnail_url: None,
custom: serde_json::Value::Null,
}
);
assert!(serde_json::from_str::<MetadataObject>(r#"{"type":1"#).is_err());
}
#[test]
fn serialize_event_sub_obj() {
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::MediaItemStart).unwrap(),
r#"{"type":0}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::MediaItemEnd).unwrap(),
r#"{"type":1}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::MediaItemChanged).unwrap(),
r#"{"type":2}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(),
r#"{"keys":[],"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown { keys: vec![] }).unwrap(),
r#"{"keys":[],"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyUp {
keys: vec![s!("abc"), s!("def")]
})
.unwrap(),
r#"{"keys":["abc","def"],"type":4}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown {
keys: vec![s!("abc"), s!("def")]
})
.unwrap(),
r#"{"keys":["abc","def"],"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventSubscribeObject::KeyDown {
keys: vec![s!("\"\"")]
})
.unwrap(),
r#"{"keys":["\"\""],"type":3}"#
);
}
#[test]
fn deserialize_event_sub_obj() {
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"type":0}"#).unwrap(),
EventSubscribeObject::MediaItemStart
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"type":1}"#).unwrap(),
EventSubscribeObject::MediaItemEnd
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"type":2}"#).unwrap(),
EventSubscribeObject::MediaItemChanged
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":[],"type":3}"#).unwrap(),
EventSubscribeObject::KeyDown { keys: vec![] }
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":[],"type":4}"#).unwrap(),
EventSubscribeObject::KeyUp { keys: vec![] }
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":["abc","def"],"type":3}"#)
.unwrap(),
EventSubscribeObject::KeyDown {
keys: vec![s!("abc"), s!("def")]
}
);
assert_eq!(
serde_json::from_str::<EventSubscribeObject>(r#"{"keys":["abc","def"],"type":4}"#)
.unwrap(),
EventSubscribeObject::KeyUp {
keys: vec![s!("abc"), s!("def")]
}
);
assert!(serde_json::from_str::<EventSubscribeObject>(r#""type":5}"#).is_err());
}
const EMPTY_TEST_MEDIA_ITEM: MediaItem = MediaItem {
container: String::new(),
url: None,
content: None,
time: None,
volume: None,
speed: None,
cache: None,
showDuration: None,
headers: None,
metadata: None,
};
const TEST_MEDIA_ITEM_JSON: &str = r#"{"cache":null,"container":"","content":null,"headers":null,"metadata":null,"showDuration":null,"speed":null,"time":null,"url":null,"volume":null}"#;
#[test]
fn serialize_event_obj() {
assert_eq!(
serde_json::to_string(&EventObject::MediaItem {
variant: EventType::MediaItemStart,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
})
.unwrap(),
format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"#),
);
assert_eq!(
serde_json::to_string(&EventObject::MediaItem {
variant: EventType::MediaItemEnd,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
})
.unwrap(),
format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"#),
);
assert_eq!(
serde_json::to_string(&EventObject::MediaItem {
variant: EventType::MediaItemChange,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
})
.unwrap(),
format!(r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"#),
);
assert_eq!(
&serde_json::to_string(&EventObject::Key {
variant: EventType::KeyDown,
key: s!(""),
repeat: false,
handled: false,
})
.unwrap(),
r#"{"handled":false,"key":"","repeat":false,"type":3}"#
);
assert_eq!(
&serde_json::to_string(&EventObject::Key {
variant: EventType::KeyUp,
key: s!(""),
repeat: false,
handled: false,
})
.unwrap(),
r#"{"handled":false,"key":"","repeat":false,"type":4}"#
);
}
#[test]
fn deserialize_event_obj() {
assert_eq!(
serde_json::from_str::<EventObject>(&format!(
r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":0}}"#
))
.unwrap(),
EventObject::MediaItem {
variant: EventType::MediaItemStart,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
}
);
assert_eq!(
serde_json::from_str::<EventObject>(&format!(
r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":1}}"#
))
.unwrap(),
EventObject::MediaItem {
variant: EventType::MediaItemEnd,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
}
);
assert_eq!(
serde_json::from_str::<EventObject>(&format!(
r#"{{"item":{TEST_MEDIA_ITEM_JSON},"type":2}}"#
))
.unwrap(),
EventObject::MediaItem {
variant: EventType::MediaItemChange,
item: EMPTY_TEST_MEDIA_ITEM.clone(),
}
);
assert_eq!(
serde_json::from_str::<EventObject>(
r#"{"handled":false,"key":"","repeat":false,"type":3}"#
)
.unwrap(),
EventObject::Key {
variant: EventType::KeyDown,
key: s!(""),
repeat: false,
handled: false,
}
);
assert_eq!(
serde_json::from_str::<EventObject>(
r#"{"handled":false,"key":"","repeat":false,"type":4}"#
)
.unwrap(),
EventObject::Key {
variant: EventType::KeyUp,
key: s!(""),
repeat: false,
handled: false,
}
);
assert!(serde_json::from_str::<EventObject>(r#"{"type":5}"#).is_err());
}
}

View file

@ -1,12 +1,13 @@
use std::io::{Read, Write};
use std::net::TcpStream;
use tungstenite::Message;
use tungstenite::protocol::WebSocket;
use tungstenite::Message;
pub trait Transport {
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error>;
fn transport_shutdown(&mut self) -> Result<(), std::io::Error>;
fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error>;
}
impl Transport for TcpStream {
@ -21,6 +22,10 @@ impl Transport for TcpStream {
fn transport_shutdown(&mut self) -> Result<(), std::io::Error> {
self.shutdown(std::net::Shutdown::Both)
}
fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> {
self.read_exact(buf)
}
}
impl<T: Read + Write> Transport for WebSocket<T> {
@ -30,27 +35,69 @@ impl<T: Read + Write> Transport for WebSocket<T> {
let len = std::cmp::min(buf.len(), data.len());
buf[..len].copy_from_slice(&data[..len]);
Ok(len)
},
_ => Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid message type"))
}
_ => Err(std::io::Error::other("Invalid message type")),
}
}
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
self.write(Message::Binary(buf.to_vec()))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.flush().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map_err(std::io::Error::other)?;
self.flush().map_err(std::io::Error::other)
}
fn transport_shutdown(&mut self) -> Result<(), std::io::Error> {
self.close(None).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.close(None).map_err(std::io::Error::other)?;
loop {
match self.read() {
Ok(_) => continue,
Err(tungstenite::Error::ConnectionClosed) => break,
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
Err(e) => return Err(std::io::Error::other(e)),
}
}
Ok(())
}
}
fn transport_read_exact(&mut self, buf: &mut [u8]) -> Result<(), std::io::Error> {
let mut total_read = 0;
while total_read < buf.len() {
total_read += self.transport_read(&mut buf[total_read..])?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::net::TcpListener;
use super::*;
#[test]
fn websocket_read_exact() {
let jh = std::thread::spawn(|| {
let server = TcpListener::bind("127.0.0.1:51234").unwrap();
let stream = server.incoming().next().unwrap().unwrap();
let mut websocket = tungstenite::accept(stream).unwrap();
websocket
.send(tungstenite::Message::binary([1, 2, 3]))
.unwrap();
});
let (mut websocket, _) = tungstenite::connect("ws://127.0.0.1:51234").unwrap();
fn read_exact<T: Transport>(stream: &mut T) {
let mut buf = [0u8; 3];
stream.transport_read_exact(&mut buf).unwrap();
assert_eq!(buf, [1, 2, 3]);
}
read_exact(&mut websocket);
websocket.close(None).unwrap();
jh.join().unwrap();
}
}