1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-08-07 17:52:49 +00:00

Initial commit.

This commit is contained in:
Koen 2023-06-20 08:45:01 +02:00
commit c8394f6a8e
99 changed files with 8173 additions and 0 deletions

View file

@ -0,0 +1,242 @@
use std::{net::TcpStream, io::{Write, Read}, sync::{atomic::{AtomicBool, Ordering}, Arc}};
use crate::models::{PlaybackUpdateMessage, VolumeUpdateMessage};
use serde::Serialize;
#[derive(Debug)]
enum SessionState {
Idle = 0,
WaitingForLength,
WaitingForData,
Disconnected,
}
#[derive(Debug)]
pub enum Opcode {
None = 0,
Play = 1,
Pause = 2,
Resume = 3,
Stop = 4,
Seek = 5,
PlaybackUpdate = 6,
VolumeUpdate = 7,
SetVolume = 8
}
impl Opcode {
fn from_u8(value: u8) -> Opcode {
match value {
0 => Opcode::None,
1 => Opcode::Play,
2 => Opcode::Pause,
3 => Opcode::Resume,
4 => Opcode::Stop,
5 => Opcode::Seek,
6 => Opcode::PlaybackUpdate,
7 => Opcode::VolumeUpdate,
8 => Opcode::SetVolume,
_ => panic!("Unknown value: {}", value),
}
}
}
const LENGTH_BYTES: usize = 4;
const MAXIMUM_PACKET_LENGTH: usize = 32000;
pub struct FCastSession<'a> {
buffer: Vec<u8>,
bytes_read: usize,
packet_length: usize,
stream: &'a TcpStream,
state: SessionState
}
impl<'a> FCastSession<'a> {
pub fn new(stream: &'a TcpStream) -> Self {
FCastSession {
buffer: vec![0; MAXIMUM_PACKET_LENGTH],
bytes_read: 0,
packet_length: 0,
stream,
state: SessionState::Idle
}
}
}
impl FCastSession<'_> {
pub fn send_message<T: Serialize>(&mut self, opcode: Opcode, message: T) -> Result<(), std::io::Error> {
let json = serde_json::to_string(&message)?;
let data = json.as_bytes();
let size = 1 + data.len();
let header_size = LENGTH_BYTES + 1;
let mut header = vec![0u8; header_size];
header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes());
header[LENGTH_BYTES] = opcode as u8;
let packet = [header, data.to_vec()].concat();
println!("Sent {} bytes with (header size: {}, body size: {}).", packet.len(), header_size, data.len());
return self.stream.write_all(&packet);
}
pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), std::io::Error> {
let json = String::new();
let data = json.as_bytes();
let size = 1 + data.len();
let mut header = vec![0u8; LENGTH_BYTES + 1];
header[..LENGTH_BYTES].copy_from_slice(&(size as u32).to_le_bytes());
header[LENGTH_BYTES] = opcode as u8;
let packet = [header, data.to_vec()].concat();
return self.stream.write_all(&packet);
}
pub fn receive_loop(&mut self, running: &Arc<AtomicBool>) -> Result<(), Box<dyn std::error::Error>> {
self.state = SessionState::WaitingForLength;
let mut buffer = [0u8; 1024];
while running.load(Ordering::SeqCst) {
let bytes_read = self.stream.read(&mut buffer)?;
self.process_bytes(&buffer[..bytes_read])?;
}
self.state = SessionState::Idle;
Ok(())
}
pub fn process_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
if received_bytes.is_empty() {
return Ok(());
}
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
println!("{} bytes received from {}", received_bytes.len(), addr);
match self.state {
SessionState::WaitingForLength => self.handle_length_bytes(received_bytes)?,
SessionState::WaitingForData => self.handle_packet_bytes(received_bytes)?,
_ => println!("Data received is unhandled in current session state {:?}", self.state),
}
Ok(())
}
fn handle_length_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let bytes_to_read = std::cmp::min(LENGTH_BYTES, received_bytes.len());
let bytes_remaining = received_bytes.len() - bytes_to_read;
self.buffer[self.bytes_read..self.bytes_read + bytes_to_read]
.copy_from_slice(&received_bytes[..bytes_to_read]);
self.bytes_read += bytes_to_read;
println!("handleLengthBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= LENGTH_BYTES {
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
self.state = SessionState::WaitingForData;
self.packet_length = u32::from_le_bytes(self.buffer[..LENGTH_BYTES].try_into()?) as usize;
self.bytes_read = 0;
println!("Packet length header received from {}: {}", addr, self.packet_length);
if self.packet_length > MAXIMUM_PACKET_LENGTH {
println!("Maximum packet length is 32kB, killing stream {}: {}", addr, self.packet_length);
self.stream.shutdown(std::net::Shutdown::Both)?;
self.state = SessionState::Disconnected;
return Err(format!("Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", self.packet_length).into());
}
if bytes_remaining > 0 {
println!("{} remaining bytes {} pushed to handlePacketBytes", bytes_remaining, addr);
self.handle_packet_bytes(&received_bytes[bytes_to_read..])?;
}
}
Ok(())
}
fn handle_packet_bytes(&mut self, received_bytes: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let bytes_to_read = std::cmp::min(self.packet_length, received_bytes.len());
let bytes_remaining = received_bytes.len() - bytes_to_read;
self.buffer[self.bytes_read..self.bytes_read + bytes_to_read]
.copy_from_slice(&received_bytes[..bytes_to_read]);
self.bytes_read += bytes_to_read;
println!("handlePacketBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= self.packet_length {
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
println!("Packet finished receiving from {} of {} bytes.", addr, self.packet_length);
self.handle_packet()?;
self.state = SessionState::WaitingForLength;
self.packet_length = 0;
self.bytes_read = 0;
if bytes_remaining > 0 {
println!("{} remaining bytes {} pushed to handleLengthBytes", bytes_remaining, addr);
self.handle_length_bytes(&received_bytes[bytes_to_read..])?;
}
}
Ok(())
}
fn handle_packet(&mut self) -> Result<(), std::str::Utf8Error> {
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
println!("Processing packet of {} bytes from {}", self.bytes_read, addr);
let opcode = Opcode::from_u8(self.buffer[0]);
let body = if self.packet_length > 1 {
Some(std::str::from_utf8(&self.buffer[1..self.packet_length])?)
} else {
None
};
println!("Received body: {:?}", body);
match opcode {
Opcode::PlaybackUpdate => {
if let Some(body_str) = body {
if let Ok(playback_update_msg) = serde_json::from_str::<PlaybackUpdateMessage>(body_str) {
println!("Received playback update {:?}", playback_update_msg);
}
}
}
Opcode::VolumeUpdate => {
if let Some(body_str) = body {
if let Ok(volume_update_msg) = serde_json::from_str::<VolumeUpdateMessage>(body_str) {
println!("Received volume update {:?}", volume_update_msg);
}
}
}
_ => {
println!("Error handling packet from {}", addr);
}
}
Ok(())
}
pub fn shutdown(&self) -> Result<(), std::io::Error> {
return self.stream.shutdown(std::net::Shutdown::Both);
}
}

View file

@ -0,0 +1,193 @@
mod models;
mod fcastsession;
use clap::{App, Arg, SubCommand};
use std::{io::Read, net::TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::fcastsession::Opcode;
use crate::models::SetVolumeMessage;
use crate::{models::{PlayMessage, SeekMessage}, fcastsession::FCastSession};
fn main() {
if let Err(e) = run() {
println!("Failed due to error: {}", e)
}
}
fn run() -> Result<(), Box<dyn std::error::Error>> {
let app = App::new("Media Control")
.about("Control media playback")
.arg(Arg::with_name("host")
.short('h')
.long("host")
.value_name("Host")
.help("The host address to send the command to")
.required(true)
.takes_value(true))
.arg(Arg::with_name("port")
.short('p')
.long("port")
.value_name("PORT")
.help("The port to send the command to")
.required(false)
.default_value("46899")
.takes_value(true))
.subcommand(SubCommand::with_name("play")
.about("Play media")
.arg(Arg::with_name("mime_type")
.short('m')
.long("mime_type")
.value_name("MIME_TYPE")
.help("Mime type (e.g., video/mp4)")
.required(true)
.takes_value(true)
)
.arg(Arg::with_name("url")
.short('u')
.long("url")
.value_name("URL")
.help("URL to the content")
.required(false)
.takes_value(true)
)
.arg(Arg::with_name("content")
.short('c')
.long("content")
.value_name("CONTENT")
.help("The actual content")
.required(false)
.takes_value(true)
)
.arg(Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(false)
.default_value("0")
.takes_value(true)
)
)
.subcommand(SubCommand::with_name("seek")
.about("Seek to a timestamp")
.arg(Arg::with_name("timestamp")
.short('t')
.long("timestamp")
.value_name("TIMESTAMP")
.help("Timestamp to start playing")
.required(true)
.takes_value(true)
),
)
.subcommand(SubCommand::with_name("pause").about("Pause media"))
.subcommand(SubCommand::with_name("resume").about("Resume media"))
.subcommand(SubCommand::with_name("stop").about("Stop media"))
.subcommand(SubCommand::with_name("listen").about("Listen to incoming events"))
.subcommand(SubCommand::with_name("setvolume").about("Set the volume")
.arg(Arg::with_name("volume")
.short('v')
.long("volume")
.value_name("VOLUME")
.help("Volume level (0-1)")
.required(true)
.takes_value(true))
);
let matches = app.get_matches();
let host = match matches.value_of("host") {
Some(s) => s,
_ => return Err("Host is required.".into())
};
let port = match matches.value_of("port") {
Some(s) => s,
_ => return Err("Port is required.".into())
};
println!("Connecting to host={} port={}...", host, port);
let stream = TcpStream::connect(format!("{}:{}", host, port))?;
let mut session = FCastSession::new(&stream);
println!("Connection established.");
if let Some(play_matches) = matches.subcommand_matches("play") {
let mut play_message = PlayMessage::new(
match play_matches.value_of("mime_type") {
Some(s) => s.to_string(),
_ => return Err("MIME type is required.".into())
},
match play_matches.value_of("url") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("content") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("timestamp") {
Some(s) => s.parse::<u64>().ok(),
_ => None
}
);
if play_message.content.is_none() && play_message.url.is_none() {
println!("Reading content from stdin...");
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
play_message.content = Some(buffer);
}
let json = serde_json::to_string(&play_message);
println!("Sent play {:?}", json);
session.send_message(Opcode::Play, Some(play_message))?;
} else if let Some(seek_matches) = matches.subcommand_matches("seek") {
let seek_message = SeekMessage::new(match seek_matches.value_of("timestamp") {
Some(s) => s.parse::<u64>()?,
_ => return Err("Timestamp is required.".into())
});
println!("Sent seek {:?}", seek_message);
session.send_message(Opcode::Seek, Some(seek_message))?;
} else if let Some(_) = matches.subcommand_matches("pause") {
println!("Sent pause");
session.send_empty(Opcode::Pause)?;
} else if let Some(_) = matches.subcommand_matches("resume") {
println!("Sent resume");
session.send_empty(Opcode::Resume)?;
} else if let Some(_) = matches.subcommand_matches("stop") {
println!("Sent stop");
session.send_empty(Opcode::Stop)?;
} else if let Some(_) = matches.subcommand_matches("listen") {
println!("Starter listening to events...");
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
println!("Waiting for Ctrl+C...");
session.receive_loop(&running)?;
println!("Ctrl+C received, exiting...");
} else if let Some(setvolume_matches) = matches.subcommand_matches("setvolume") {
let setvolume_message = SetVolumeMessage::new(match setvolume_matches.value_of("volume") {
Some(s) => s.parse::<f64>()?,
_ => return Err("Timestamp is required.".into())
});
println!("Sent setvolume {:?}", setvolume_message);
session.send_message(Opcode::SetVolume, Some(setvolume_message))?;
} else {
println!("Invalid command. Use --help for more information.");
std::process::exit(1);
}
session.shutdown()?;
Ok(())
}

View file

@ -0,0 +1,48 @@
use serde::{Serialize, Deserialize};
#[derive(Serialize, Debug)]
pub struct PlayMessage {
pub container: String,
pub url: Option<String>,
pub content: Option<String>,
pub time: Option<u64>,
}
impl PlayMessage {
pub fn new(container: String, url: Option<String>, content: Option<String>, time: Option<u64>) -> Self {
Self { container, url, content, time }
}
}
#[derive(Serialize, Debug)]
pub struct SeekMessage {
pub time: u64,
}
impl SeekMessage {
pub fn new(time: u64) -> Self {
Self { time }
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackUpdateMessage {
pub time: u64,
pub state: u8 //0 = None, 1 = Playing, 2 = Paused
}
#[derive(Deserialize, Debug)]
pub struct VolumeUpdateMessage {
pub volume: f64 //(0-1)
}
#[derive(Serialize, Debug)]
pub struct SetVolumeMessage {
pub volume: f64,
}
impl SetVolumeMessage {
pub fn new(volume: f64) -> Self {
Self { volume }
}
}