1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-06-24 21:25:23 +00:00

Finished first version of Chrome extension to cast to FCast. Added support for WebSocket to terminal client. Added global support for setting playback speed. Added support for casting local file using terminal client. Added global support for playback error messages. Fixed crash caused by failing to unregister MDNS. Fixed issue where subtitles would always show for HLS. Added support for fractional seconds globally. Layout fixes to desktop casting client. Added footer telling user they can close the window.

This commit is contained in:
Koen 2023-12-07 16:10:18 +01:00
parent fd9a63dac0
commit 18b61d549c
26 changed files with 1116 additions and 193 deletions

View file

@ -2,7 +2,9 @@ let mediaUrls = [];
let hosts = [];
let currentWebSocket = null;
let playbackState = null;
let playbackStateUpdateTime = null;
let volume = 1.0;
let volumeUpdateTime = null;
let selectedHost = null;
const Opcode = {
@ -35,7 +37,6 @@ chrome.runtime.onInstalled.addListener(function() {
chrome.webRequest.onHeadersReceived.addListener(
function(details) {
console.log(`onHeadersReceived (${details.url})`, details);
const contentType = details.responseHeaders.find(header => header.name.toLowerCase() === 'content-type')?.value;
if (!contentType) {
return;
@ -48,10 +49,16 @@ chrome.webRequest.onHeadersReceived.addListener(
const isSegment = details.url.endsWith(".ts");
if (contentType && isMedia && !isSegment) {
if (!mediaUrls.some(v => v.url === details.url))
mediaUrls.push({contentType, url: details.url});
console.log('Media URL found:', {contentType, url: details.url});
notifyPopup('updateUrls');
if (!mediaUrls.some(v => v.url === details.url)) {
mediaUrls.unshift({contentType, url: details.url});
if (mediaUrls.length > 5) {
mediaUrls.pop();
}
notifyPopup('updateUrls');
}
}
},
{ urls: ["<all_urls>"] },
@ -114,6 +121,8 @@ chrome.runtime.onMessage.addListener(function(request, sender, sendResponse) {
} else if (request.action === 'stop') {
stop(selectedHost);
} else if (request.action === 'setVolume') {
volumeUpdateTime = Date.now();
volume = request.volume;
setVolume(selectedHost, request.volume);
} else if (request.action === 'seek') {
seek(selectedHost, request.time);
@ -241,8 +250,10 @@ function maintainWebSocketConnection(host) {
try {
const playbackUpdateMsg = JSON.parse(body);
console.log("Received playback update", playbackUpdateMsg);
playbackState = playbackUpdateMsg;
notifyPopup('updatePlaybackState');
if (playbackStateUpdateTime == null || playbackStateUpdateTime.generationTime > playbackStateUpdateTime) {
playbackState = playbackUpdateMsg;
notifyPopup('updatePlaybackState');
}
} catch (error) {
console.error("Error parsing playback update message:", error);
}
@ -254,8 +265,11 @@ function maintainWebSocketConnection(host) {
try {
const volumeUpdateMsg = JSON.parse(body);
console.log("Received volume update", volumeUpdateMsg);
volume = volumeUpdateMsg;
notifyPopup('updateVolume');
if (volumeUpdateTime == null || volumeUpdateMsg.generationTime > volumeUpdateTime) {
volume = volumeUpdateMsg.volume;
volumeUpdateTime = volumeUpdateMsg.generationTime;
notifyPopup('updateVolume');
}
} catch (error) {
console.error("Error parsing volume update message:", error);
}

View file

@ -51,6 +51,7 @@ function updateUrlList() {
castButton.disabled = !response.selectedHost;
castButton.addEventListener('click', function() {
if (response.selectedHost) {
console.log("castVideo", url);
chrome.runtime.sendMessage({ action: 'castVideo', url });
}
});

View file

@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "ascii"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16"
[[package]]
name = "atty"
version = "0.2.14"
@ -25,12 +31,39 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chunked_transfer"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901"
[[package]]
name = "clap"
version = "3.2.23"
@ -55,6 +88,25 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "cpufeatures"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0"
dependencies = [
"libc",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "ctrlc"
version = "3.2.5"
@ -65,6 +117,22 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "data-encoding"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "fcast"
version = "0.1.0"
@ -73,6 +141,45 @@ dependencies = [
"ctrlc",
"serde",
"serde_json",
"tiny_http",
"tungstenite",
"url",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
dependencies = [
"percent-encoding",
]
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
@ -90,6 +197,39 @@ dependencies = [
"libc",
]
[[package]]
name = "http"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "httparse"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpdate"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "idna"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.9.2"
@ -108,9 +248,15 @@ checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "libc"
version = "0.2.140"
version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "log"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "nix"
@ -131,23 +277,65 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267"
[[package]]
name = "proc-macro2"
version = "1.0.53"
name = "percent-encoding"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba466839c78239c09faf015484e5cc04860f88242cff4d03eb038f04b4699b73"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.26"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "ryu"
version = "1.0.13"
@ -156,18 +344,18 @@ checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "serde"
version = "1.0.158"
version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "771d4d9c4163ee138805e12c710dd365e4f44be8be0503cb1bb9eb989425d9c9"
checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.158"
version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e801c1712f48475582b7696ac71e0ca34ebb30e09338425384269d9717c62cad"
checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@ -185,6 +373,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
@ -199,9 +398,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "2.0.8"
version = "2.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcc02725fd69ab9f26eab07fad303e2497fad6fb9eba4f96c4d1687bdf704ad9"
checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a"
dependencies = [
"proc-macro2",
"quote",
@ -223,12 +422,128 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tiny_http"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82"
dependencies = [
"ascii",
"chunked_transfer",
"httpdate",
"log",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-bidi"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416"
[[package]]
name = "unicode-ident"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]]
name = "url"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"

View file

@ -10,3 +10,6 @@ clap = "3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ctrlc = "3.1.9"
tungstenite = "0.21.0"
url = "2.5.0"
tiny_http = "0.12.0"

View file

@ -15,8 +15,14 @@ cargo build
Example usage of the fcast client.
```
# Play a mp4 video URL
./fcast -h localhost play --mime_type video/mp4 --url http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -t 10
# Play a mp4 video URL (1.0 playbackspeed explicit)
./fcast -h localhost play --mime_type video/mp4 --url http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -t 10 -s 1.0
# Play a mp4 video URL using WebSockets
./fcast -h localhost -c ws play --mime_type video/mp4 --url http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -t 10
# Play a local mp4
./fcast -h 192.168.1.62 play --mime_type video/mp4 -f /home/koen/Downloads/BigBuckBunny.mp4
# Play a DASH URL
./fcast -h localhost play --mime_type application/dash+xml --url https://dash.akamaized.net/digitalprimates/fraunhofer/480p_video/heaac_2_0_with_video/Sintel/sintel_480p_heaac2_0.mpd
@ -41,4 +47,7 @@ cat dash.mpd | ./fcast -h localhost play --mime_type application/dash+xml
# Set volume to half
./fcast -h localhost setvolume -v 0.5
# Set speed to double
./fcast -h localhost setspeed -s 2.0
```

View file

@ -1,6 +1,6 @@
use std::{net::TcpStream, io::{Write, Read}, sync::{atomic::{AtomicBool, Ordering}, Arc}};
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
use crate::models::{PlaybackUpdateMessage, VolumeUpdateMessage};
use crate::{models::{PlaybackUpdateMessage, VolumeUpdateMessage, PlaybackErrorMessage}, transport::Transport};
use serde::Serialize;
#[derive(Debug)]
@ -21,7 +21,9 @@ pub enum Opcode {
Seek = 5,
PlaybackUpdate = 6,
VolumeUpdate = 7,
SetVolume = 8
SetVolume = 8,
PlaybackError = 9,
SetSpeed = 10
}
impl Opcode {
@ -36,6 +38,8 @@ impl Opcode {
6 => Opcode::PlaybackUpdate,
7 => Opcode::VolumeUpdate,
8 => Opcode::SetVolume,
9 => Opcode::PlaybackError,
10 => Opcode::SetSpeed,
_ => panic!("Unknown value: {}", value),
}
}
@ -48,17 +52,17 @@ pub struct FCastSession<'a> {
buffer: Vec<u8>,
bytes_read: usize,
packet_length: usize,
stream: &'a TcpStream,
stream: Box<dyn Transport + 'a>,
state: SessionState
}
impl<'a> FCastSession<'a> {
pub fn new(stream: &'a TcpStream) -> Self {
pub fn new<T: Transport + 'a>(stream: T) -> Self {
FCastSession {
buffer: vec![0; MAXIMUM_PACKET_LENGTH],
bytes_read: 0,
packet_length: 0,
stream,
stream: Box::new(stream),
state: SessionState::Idle
}
}
@ -76,7 +80,7 @@ impl FCastSession<'_> {
let packet = [header, data.to_vec()].concat();
println!("Sent {} bytes with (header size: {}, body size: {}).", packet.len(), header_size, data.len());
return self.stream.write_all(&packet);
return self.stream.transport_write(&packet);
}
pub fn send_empty(&mut self, opcode: Opcode) -> Result<(), std::io::Error> {
@ -88,7 +92,7 @@ impl FCastSession<'_> {
header[LENGTH_BYTES] = opcode as u8;
let packet = [header, data.to_vec()].concat();
return self.stream.write_all(&packet);
return self.stream.transport_write(&packet);
}
pub fn receive_loop(&mut self, running: &Arc<AtomicBool>) -> Result<(), Box<dyn std::error::Error>> {
@ -96,7 +100,7 @@ impl FCastSession<'_> {
let mut buffer = [0u8; 1024];
while running.load(Ordering::SeqCst) {
let bytes_read = self.stream.read(&mut buffer)?;
let bytes_read = self.stream.transport_read(&mut buffer)?;
self.process_bytes(&buffer[..bytes_read])?;
}
@ -109,12 +113,7 @@ impl FCastSession<'_> {
return Ok(());
}
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
println!("{} bytes received from {}", received_bytes.len(), addr);
println!("{} bytes received", received_bytes.len());
match self.state {
SessionState::WaitingForLength => self.handle_length_bytes(received_bytes)?,
@ -136,27 +135,22 @@ impl FCastSession<'_> {
println!("handleLengthBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= LENGTH_BYTES {
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
self.state = SessionState::WaitingForData;
self.packet_length = u32::from_le_bytes(self.buffer[..LENGTH_BYTES].try_into()?) as usize;
self.bytes_read = 0;
println!("Packet length header received from {}: {}", addr, self.packet_length);
println!("Packet length header received from: {}", self.packet_length);
if self.packet_length > MAXIMUM_PACKET_LENGTH {
println!("Maximum packet length is 32kB, killing stream {}: {}", addr, self.packet_length);
println!("Maximum packet length is 32kB, killing stream: {}", self.packet_length);
self.stream.shutdown(std::net::Shutdown::Both)?;
self.stream.transport_shutdown()?;
self.state = SessionState::Disconnected;
return Err(format!("Stream killed due to packet length ({}) exceeding maximum 32kB packet size.", self.packet_length).into());
}
if bytes_remaining > 0 {
println!("{} remaining bytes {} pushed to handlePacketBytes", bytes_remaining, addr);
println!("{} remaining bytes pushed to handlePacketBytes", bytes_remaining);
self.handle_packet_bytes(&received_bytes[bytes_to_read..])?;
}
@ -175,12 +169,7 @@ impl FCastSession<'_> {
println!("handlePacketBytes: Read {} bytes from packet", bytes_to_read);
if self.bytes_read >= self.packet_length {
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
println!("Packet finished receiving from {} of {} bytes.", addr, self.packet_length);
println!("Packet finished receiving of {} bytes.", self.packet_length);
self.handle_packet()?;
self.state = SessionState::WaitingForLength;
@ -188,7 +177,7 @@ impl FCastSession<'_> {
self.bytes_read = 0;
if bytes_remaining > 0 {
println!("{} remaining bytes {} pushed to handleLengthBytes", bytes_remaining, addr);
println!("{} remaining bytes pushed to handleLengthBytes", bytes_remaining);
self.handle_length_bytes(&received_bytes[bytes_to_read..])?;
}
}
@ -197,12 +186,7 @@ impl FCastSession<'_> {
}
fn handle_packet(&mut self) -> Result<(), std::str::Utf8Error> {
let addr = match self.stream.peer_addr() {
Ok(a) => a.to_string(),
_ => String::new()
};
println!("Processing packet of {} bytes from {}", self.bytes_read, addr);
println!("Processing packet of {} bytes", self.bytes_read);
let opcode = Opcode::from_u8(self.buffer[0]);
let body = if self.packet_length > 1 {
@ -228,15 +212,22 @@ impl FCastSession<'_> {
}
}
}
Opcode::PlaybackError => {
if let Some(body_str) = body {
if let Ok(playback_error_msg) = serde_json::from_str::<PlaybackErrorMessage>(body_str) {
println!("Received playback error {:?}", playback_error_msg);
}
}
}
_ => {
println!("Error handling packet from {}", addr);
println!("Error handling packet");
}
}
Ok(())
}
pub fn shutdown(&self) -> Result<(), std::io::Error> {
return self.stream.shutdown(std::net::Shutdown::Both);
pub fn shutdown(&mut self) -> Result<(), std::io::Error> {
return self.stream.transport_shutdown();
}
}

View file

@ -0,0 +1,36 @@
struct FileServer {
base_url: String,
base_path: String,
}
impl FileServer {
fn new(base_url: String, base_path: String) -> Self {
FileServer { base_url, base_path }
}
async fn serve(&self) {
let file_server = warp::fs::dir(&self.base_path);
warp::serve(file_server).run(([127, 0, 0, 1], 3030)).await;
}
fn get_url(&self, file_name: &str) -> String {
format!("{}/{}", self.base_url, file_name)
}
}
pub async fn host_file_and_get_url(file_path: &str) -> Result<String, Box<dyn std::error::Error>> {
let file_name = Path::new(file_path).file_name().ok_or("Invalid file path")?.to_str().ok_or("Invalid file name")?;
let file_server = FileServer::new("http://127.0.0.1:3030".to_string(), "path/to/hosted/files".to_string());
// Copy the file to the hosting directory
let destination = Path::new(&file_server.base_path).join(file_name);
tokio::fs::copy(file_path, &destination).await?;
// Start the server if not already running
// This part needs synchronization in a real-world scenario
tokio::spawn(async move {
file_server.serve().await;
});
Ok(file_server.get_url(file_name))
}

View file

@ -1,16 +1,26 @@
mod models;
mod fcastsession;
mod transport;
use clap::{App, Arg, SubCommand};
use tiny_http::{Server, Response, ListenAddr, Header};
use tungstenite::stream::MaybeTlsStream;
use url::Url;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::{thread, fs};
use std::time::Instant;
use std::{io::Read, net::TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{ AtomicBool, Ordering};
use std::{sync::Arc, time::Duration};
use crate::fcastsession::Opcode;
use crate::models::SetVolumeMessage;
use crate::models::{SetVolumeMessage, SetSpeedMessage};
use crate::{models::{PlayMessage, SeekMessage}, fcastsession::FCastSession};
fn main() {
fn main() {
if let Err(e) = run() {
println!("Failed due to error: {}", e)
}
@ -19,6 +29,14 @@ fn main() {
fn run() -> Result<(), Box<dyn std::error::Error>> {
let app = App::new("Media Control")
.about("Control media playback")
.arg(Arg::with_name("connection_type")
.short('c')
.long("connection_type")
.value_name("CONNECTION_TYPE")
.help("Type of connection: tcp or ws (websocket)")
.required(false)
.default_value("tcp")
.takes_value(true))
.arg(Arg::with_name("host")
.short('h')
.long("host")
@ -32,7 +50,6 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
.value_name("PORT")
.help("The port to send the command to")
.required(false)
.default_value("46899")
.takes_value(true))
.subcommand(SubCommand::with_name("play")
.about("Play media")
@ -44,6 +61,13 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
.required(true)
.takes_value(true)
)
.arg(Arg::with_name("file")
.short('f')
.long("file")
.value_name("File")
.help("File content to play")
.required(false)
.takes_value(true))
.arg(Arg::with_name("url")
.short('u')
.long("url")
@ -69,6 +93,15 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
.default_value("0")
.takes_value(true)
)
.arg(Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(false)
.default_value("1")
.takes_value(true)
)
)
.subcommand(SubCommand::with_name("seek")
.about("Seek to a timestamp")
@ -92,6 +125,14 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
.value_name("VOLUME")
.help("Volume level (0-1)")
.required(true)
.takes_value(true)))
.subcommand(SubCommand::with_name("setspeed").about("Set the playback speed")
.arg(Arg::with_name("speed")
.short('s')
.long("speed")
.value_name("SPEED")
.help("Factor to multiply playback speed by")
.required(true)
.takes_value(true))
);
@ -102,35 +143,107 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
_ => return Err("Host is required.".into())
};
let connection_type = matches.value_of("connection_type").unwrap_or("tcp");
let port = match matches.value_of("port") {
Some(s) => s,
_ => return Err("Port is required.".into())
_ => match connection_type {
"tcp" => "46899",
"ws" => "46898",
_ => return Err("Unknown connection type, cannot automatically determine port.".into())
}
};
let local_ip: Option<IpAddr>;
let mut session = match connection_type {
"tcp" => {
println!("Connecting via TCP to host={} port={}...", host, port);
let stream = TcpStream::connect(format!("{}:{}", host, port))?;
local_ip = Some(stream.local_addr()?.ip());
FCastSession::new(stream)
},
"ws" => {
println!("Connecting via WebSocket to host={} port={}...", host, port);
let url = Url::parse(format!("ws://{}:{}", host, port).as_str())?;
let (stream, _) = tungstenite::connect(url)?;
local_ip = match stream.get_ref() {
MaybeTlsStream::Plain(ref stream) => Some(stream.local_addr()?.ip()),
_ => None
};
FCastSession::new(stream)
},
_ => return Err("Invalid connection type. Use 'tcp' or 'websocket'.".into()),
};
println!("Connecting to host={} port={}...", host, port);
let stream = TcpStream::connect(format!("{}:{}", host, port))?;
let mut session = FCastSession::new(&stream);
println!("Connection established.");
let mut join_handle: Option<JoinHandle<Result<(), String>>> = None;
if let Some(play_matches) = matches.subcommand_matches("play") {
let mut play_message = PlayMessage::new(
match play_matches.value_of("mime_type") {
Some(s) => s.to_string(),
_ => return Err("MIME type is required.".into())
},
match play_matches.value_of("url") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("content") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("timestamp") {
Some(s) => s.parse::<u64>().ok(),
_ => None
let file_path = play_matches.value_of("file");
let mut play_message = if let Some(file_path) = file_path {
match local_ip {
Some(lip) => {
let mime_type = match play_matches.value_of("mime_type") {
Some(s) => s.to_string(),
_ => return Err("MIME type is required.".into())
};
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
println!("Ctrl+C triggered, server will stop when onging request finishes...");
r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
println!("Waiting for Ctrl+C...");
let result = host_file_and_get_url(&lip, file_path, &mime_type, &running)?;
let url = result.0;
join_handle = Some(result.1);
//TODO: Make this work
PlayMessage::new(
mime_type,
Some(url),
None,
match play_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>().ok(),
_ => None
},
match play_matches.value_of("speed") {
Some(s) => s.parse::<f64>().ok(),
_ => None
}
)
},
_ => return Err("Local IP was not able to be resolved.".into())
}
);
} else {
PlayMessage::new(
match play_matches.value_of("mime_type") {
Some(s) => s.to_string(),
_ => return Err("MIME type is required.".into())
},
match play_matches.value_of("url") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("content") {
Some(s) => Some(s.to_string()),
_ => None
},
match play_matches.value_of("timestamp") {
Some(s) => s.parse::<f64>().ok(),
_ => None
},
match play_matches.value_of("speed") {
Some(s) => s.parse::<f64>().ok(),
_ => None
}
)
};
if play_message.content.is_none() && play_message.url.is_none() {
println!("Reading content from stdin...");
@ -167,6 +280,7 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
let r = running.clone();
ctrlc::set_handler(move || {
println!("Ctrl+C triggered...");
r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
@ -182,12 +296,136 @@ fn run() -> Result<(), Box<dyn std::error::Error>> {
});
println!("Sent setvolume {:?}", setvolume_message);
session.send_message(Opcode::SetVolume, Some(setvolume_message))?;
} else if let Some(setspeed_matches) = matches.subcommand_matches("setspeed") {
let setspeed_message = SetSpeedMessage::new(match setspeed_matches.value_of("speed") {
Some(s) => s.parse::<f64>()?,
_ => return Err("Speed is required.".into())
});
println!("Sent setspeed {:?}", setspeed_message);
session.send_message(Opcode::SetSpeed, Some(setspeed_message))?;
} else {
println!("Invalid command. Use --help for more information.");
std::process::exit(1);
}
println!("Waiting on other threads...");
if let Some(v) = join_handle {
if let Err(_) = v.join() {
return Err("Failed to join thread.".into());
}
}
session.shutdown()?;
Ok(())
}
struct ServerState {
active_connections: usize,
last_request_time: Instant,
}
impl ServerState {
fn new() -> Self {
ServerState {
active_connections: 0,
last_request_time: Instant::now(),
}
}
}
fn host_file_and_get_url(local_ip: &IpAddr, file_path: &str, mime_type: &String, running: &Arc<AtomicBool>) -> Result<(String, thread::JoinHandle<Result<(), String>>), String> {
let server = {
let this = Server::http(format!("{}:0", local_ip));
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Failed to create server: {}", e))(e)),
}
}?;
let url = match server.server_addr() {
ListenAddr::IP(addr) => format!("http://{}:{}/", local_ip, addr.port()),
#[cfg(unix)]
ListenAddr::Unix(_) => return Err("Unix socket addresses are not supported.".to_string()),
};
println!("Server started on {}.", url);
let state = Mutex::new(ServerState::new());
let file_path_clone = file_path.to_owned();
let mime_type_clone = mime_type.to_owned();
let running_clone = running.to_owned();
let handle = thread::spawn(move || -> Result<(), String> {
loop {
if !running_clone.load(Ordering::SeqCst) {
println!("Server stopping...");
break;
}
let should_break = {
let state = {
let this = state.lock();
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Mutex error: {}", e))(e)),
}
}?;
state.active_connections == 0 && state.last_request_time.elapsed() > Duration::from_secs(300)
};
if should_break {
println!("No activity on server, closing...");
break;
}
match server.recv_timeout(Duration::from_secs(5)) {
Ok(Some(request)) => {
println!("Request received.");
let mut state = {
let this = state.lock();
match this {
Ok(t) => Ok(t),
Err(e) => Err((|e| format!("Mutex error: {}", e))(e)),
}
}?;
state.active_connections += 1;
state.last_request_time = Instant::now();
let file = {
let this = fs::File::open(&file_path_clone);
match this {
Ok(t) => Ok(t),
Err(e) => Err((|_| "Failed to open file.".to_string())(e)),
}
}?;
let content_type_header = {
let this = Header::from_str(format!("Content-Type: {}", mime_type_clone).as_str());
match this {
Ok(t) => Ok(t),
Err(e) => Err((|_| "Failed to open file.".to_string())(e)),
}
}?;
let response = Response::from_file(file)
.with_header(content_type_header);
if let Err(e) = request.respond(response) {
println!("Failed to respond to request: {}", e);
}
state.active_connections -= 1;
}
Ok(None) => {}
Err(e) => {
println!("Error receiving request: {}", e);
break;
}
}
}
Ok(())
});
Ok((url, handle))
}

View file

@ -5,12 +5,13 @@ pub struct PlayMessage {
pub container: String,
pub url: Option<String>,
pub content: Option<String>,
pub time: Option<u64>,
pub time: Option<f64>,
pub speed: Option<f64>
}
impl PlayMessage {
pub fn new(container: String, url: Option<String>, content: Option<String>, time: Option<u64>) -> Self {
Self { container, url, content, time }
pub fn new(container: String, url: Option<String>, content: Option<String>, time: Option<f64>, speed: Option<f64>) -> Self {
Self { container, url, content, time, speed }
}
}
@ -29,6 +30,7 @@ impl SeekMessage {
pub struct PlaybackUpdateMessage {
pub time: f64,
pub duration: f64,
pub speed: f64,
pub state: u8 //0 = None, 1 = Playing, 2 = Paused
}
@ -47,3 +49,19 @@ impl SetVolumeMessage {
Self { volume }
}
}
#[derive(Serialize, Debug)]
pub struct SetSpeedMessage {
pub speed: f64,
}
impl SetSpeedMessage {
pub fn new(speed: f64) -> Self {
Self { speed }
}
}
#[derive(Deserialize, Debug)]
pub struct PlaybackErrorMessage {
pub message: String,
}

View file

@ -0,0 +1,56 @@
use std::io::{Read, Write};
use std::net::TcpStream;
use tungstenite::Message;
use tungstenite::protocol::WebSocket;
pub trait Transport {
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error>;
fn transport_shutdown(&mut self) -> Result<(), std::io::Error>;
}
impl Transport for TcpStream {
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.read(buf)
}
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
self.write_all(buf)
}
fn transport_shutdown(&mut self) -> Result<(), std::io::Error> {
self.shutdown(std::net::Shutdown::Both)
}
}
impl<T: Read + Write> Transport for WebSocket<T> {
fn transport_read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self.read() {
Ok(Message::Binary(data)) => {
let len = std::cmp::min(buf.len(), data.len());
buf[..len].copy_from_slice(&data[..len]);
Ok(len)
},
_ => Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid message type"))
}
}
fn transport_write(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
self.write(Message::Binary(buf.to_vec()))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.flush().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
fn transport_shutdown(&mut self) -> Result<(), std::io::Error> {
self.close(None).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
loop {
match self.read() {
Ok(_) => continue,
Err(tungstenite::Error::ConnectionClosed) => break,
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
}
}
Ok(())
}
}

View file

@ -38,8 +38,18 @@ class DiscoveryService(private val _context: Context) {
fun stop() {
if (_nsdManager == null) return
_nsdManager?.unregisterService(_registrationListenerTcp)
_nsdManager?.unregisterService(_registrationListenerWs)
try {
_nsdManager?.unregisterService(_registrationListenerTcp)
} catch (e: Throwable) {
Log.e(TAG, "Failed to unregister TCP Listener.");
}
try {
_nsdManager?.unregisterService(_registrationListenerWs)
} catch (e: Throwable) {
Log.e(TAG, "Failed to unregister TCP Listener.");
}
_nsdManager = null
}
@ -60,4 +70,8 @@ class DiscoveryService(private val _context: Context) {
Log.e("DiscoveryService", "Service unregistration failed: errorCode=$errorCode")
}
}
companion object {
private const val TAG = "DiscoveryService"
}
}

View file

@ -6,9 +6,9 @@ import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.io.DataOutputStream
import java.io.OutputStream
import java.net.Socket
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.util.UUID
enum class SessionState {
Idle,
@ -26,7 +26,9 @@ enum class Opcode(val value: Byte) {
Seek(5),
PlaybackUpdate(6),
VolumeUpdate(7),
SetVolume(8)
SetVolume(8),
PlaybackError(9),
SetSpeed(10)
}
const val LENGTH_BYTES = 4
@ -38,6 +40,11 @@ class FCastSession(outputStream: OutputStream, private val _remoteSocketAddress:
private var _packetLength = 0
private var _state = SessionState.WaitingForLength
private var _outputStream: DataOutputStream? = DataOutputStream(outputStream)
val id = UUID.randomUUID()
fun sendPlaybackError(value: PlaybackErrorMessage) {
send(Opcode.PlaybackError, value)
}
fun sendPlaybackUpdate(value: PlaybackUpdateMessage) {
send(Opcode.PlaybackUpdate, value)
@ -82,7 +89,7 @@ class FCastSession(outputStream: OutputStream, private val _remoteSocketAddress:
Log.d(TAG, "Sent $size bytes: '$jsonString'.")
} catch (e: Throwable) {
Log.i(TAG, "Failed to send message.", e)
Log.i(TAG, "Failed to send message ${id}.", e)
throw e
}
}
@ -189,6 +196,7 @@ class FCastSession(outputStream: OutputStream, private val _remoteSocketAddress:
Opcode.Stop -> _service.onCastStop()
Opcode.Seek -> _service.onCastSeek(Json.decodeFromString(body!!))
Opcode.SetVolume -> _service.onSetVolume(Json.decodeFromString(body!!))
Opcode.SetSpeed -> _service.onSetSpeed(Json.decodeFromString(body!!))
else -> { }
}
} catch (e: Throwable) {

View file

@ -58,43 +58,30 @@ class NetworkService : Service() {
val onNewSession: (FCastSession) -> Unit = { session ->
_scope?.launch(Dispatchers.Main) {
Log.i(TAG, "On new session ${session.id}")
var encounteredError = false
while (!_stopped && !encounteredError) {
try {
val player = PlayerActivity.instance
val updateMessage = if (player != null) {
PlaybackUpdateMessage(
System.currentTimeMillis(),
player.currentPosition / 1000.0,
player.duration / 1000.0,
if (player.isPlaying) 1 else 2
)
} else {
PlaybackUpdateMessage(
System.currentTimeMillis(),
0.0,
0.0,
0
)
}
val updateMessage = generateUpdateMessage()
withContext(Dispatchers.IO) {
try {
session.sendPlaybackUpdate(updateMessage)
Log.i(TAG, "Update sent ${session.id}")
} catch (eSend: Throwable) {
Log.e(TAG, "Unhandled error sending update", eSend)
Log.e(TAG, "Unhandled error sending update ${session.id}", eSend)
encounteredError = true
return@withContext
}
Log.i(TAG, "Update sent")
}
} catch (eTimer: Throwable) {
Log.e(TAG, "Unhandled error on timer thread", eTimer)
Log.e(TAG, "Unhandled error on timer thread ${session.id}", eTimer)
} finally {
delay(1000)
}
}
Log.i(TAG, "Send loop closed ${session.id}")
}
}
@ -102,11 +89,11 @@ class NetworkService : Service() {
start()
}
_tcpListenerService = TcpListenerService(this, onNewSession).apply {
_tcpListenerService = TcpListenerService(this) { onNewSession(it) }.apply {
start()
}
_webSocketListenerService = WebSocketListenerService(this, onNewSession).apply {
_webSocketListenerService = WebSocketListenerService(this) { onNewSession(it) }.apply {
start()
}
@ -153,11 +140,82 @@ class NetworkService : Service() {
instance = null
}
fun generateUpdateMessage(): PlaybackUpdateMessage {
val player = PlayerActivity.instance
return if (player != null) {
PlaybackUpdateMessage(
System.currentTimeMillis(),
player.currentPosition / 1000.0,
player.duration / 1000.0,
if (player.isPlaying) 1 else 2,
player.speed.toDouble()
)
} else {
PlaybackUpdateMessage(
System.currentTimeMillis(),
0.0,
0.0,
0,
0.0
)
}
}
fun sendPlaybackError(error: String) {
val message = PlaybackErrorMessage(error)
_tcpListenerService?.forEachSession { session ->
_scope?.launch(Dispatchers.IO) {
try {
session.sendPlaybackError(message)
Log.i(TAG, "Playback error sent ${session.id}")
} catch (e: Throwable) {
Log.w(TAG, "Failed to send playback error", e)
}
}
}
_webSocketListenerService?.forEachSession { session ->
_scope?.launch(Dispatchers.IO) {
try {
session.sendPlaybackError(message)
Log.i(TAG, "Playback error sent ${session.id}")
} catch (e: Throwable) {
Log.w(TAG, "Failed to send playback error", e)
}
}
}
}
fun sendPlaybackUpdate(message: PlaybackUpdateMessage) {
_tcpListenerService?.forEachSession { session ->
_scope?.launch(Dispatchers.IO) {
try {
session.sendPlaybackUpdate(message)
Log.i(TAG, "Playback update sent ${session.id}")
} catch (e: Throwable) {
Log.w(TAG, "Failed to send playback update", e)
}
}
}
_webSocketListenerService?.forEachSession { session ->
_scope?.launch(Dispatchers.IO) {
try {
session.sendPlaybackUpdate(message)
Log.i(TAG, "Playback update sent ${session.id}")
} catch (e: Throwable) {
Log.w(TAG, "Failed to send playback update", e)
}
}
}
}
fun sendCastVolumeUpdate(value: VolumeUpdateMessage) {
_tcpListenerService?.forEachSession { session ->
_scope?.launch {
_scope?.launch(Dispatchers.IO) {
try {
session.sendVolumeUpdate(value)
Log.i(TAG, "Volume update sent ${session.id}")
} catch (e: Throwable) {
Log.w(TAG, "Failed to send volume update", e)
}
@ -165,9 +223,10 @@ class NetworkService : Service() {
}
_webSocketListenerService?.forEachSession { session ->
_scope?.launch {
_scope?.launch(Dispatchers.IO) {
try {
session.sendVolumeUpdate(value)
Log.i(TAG, "Volume update sent ${session.id}")
} catch (e: Throwable) {
Log.w(TAG, "Failed to send volume update", e)
}
@ -178,7 +237,7 @@ class NetworkService : Service() {
fun onCastPlay(playMessage: PlayMessage) {
Log.i(TAG, "onPlay")
_scope?.launch {
_scope?.launch(Dispatchers.Main) {
try {
if (PlayerActivity.instance == null) {
val i = Intent(this@NetworkService, PlayerActivity::class.java)
@ -187,6 +246,7 @@ class NetworkService : Service() {
i.putExtra("url", playMessage.url)
i.putExtra("content", playMessage.content)
i.putExtra("time", playMessage.time)
i.putExtra("speed", playMessage.speed)
if (activityCount > 0) {
startActivity(i)
@ -219,7 +279,7 @@ class NetworkService : Service() {
fun onCastPause() {
Log.i(TAG, "onPause")
_scope?.launch {
_scope?.launch(Dispatchers.Main) {
try {
PlayerActivity.instance?.pause()
} catch (e: Throwable) {
@ -231,7 +291,7 @@ class NetworkService : Service() {
fun onCastResume() {
Log.i(TAG, "onResume")
_scope?.launch {
_scope?.launch(Dispatchers.Main) {
try {
PlayerActivity.instance?.resume()
} catch (e: Throwable) {
@ -243,7 +303,7 @@ class NetworkService : Service() {
fun onCastStop() {
Log.i(TAG, "onStop")
_scope?.launch {
_scope?.launch(Dispatchers.Main) {
try {
PlayerActivity.instance?.finish()
} catch (e: Throwable) {
@ -255,7 +315,7 @@ class NetworkService : Service() {
fun onCastSeek(seekMessage: SeekMessage) {
Log.i(TAG, "onSeek")
_scope?.launch {
_scope?.launch(Dispatchers.Main) {
try {
PlayerActivity.instance?.seek(seekMessage)
} catch (e: Throwable) {
@ -267,7 +327,7 @@ class NetworkService : Service() {
fun onSetVolume(setVolumeMessage: SetVolumeMessage) {
Log.i(TAG, "onSetVolume")
_scope?.launch {
_scope?.launch(Dispatchers.Main) {
try {
PlayerActivity.instance?.setVolume(setVolumeMessage)
} catch (e: Throwable) {
@ -276,6 +336,18 @@ class NetworkService : Service() {
}
}
fun onSetSpeed(setSpeedMessage: SetSpeedMessage) {
Log.i(TAG, "setSpeedMessage")
_scope?.launch(Dispatchers.Main) {
try {
PlayerActivity.instance?.setSpeed(setSpeedMessage)
} catch (e: Throwable) {
Log.e(TAG, "Failed to seek", e)
}
}
}
companion object {
private const val CHANNEL_ID = "NetworkListenerServiceChannel"
private const val NOTIFICATION_ID = 1

View file

@ -7,7 +7,8 @@ data class PlayMessage(
val container: String,
val url: String? = null,
val content: String? = null,
val time: Long? = null
val time: Double? = null,
val speed: Double? = null
)
@Serializable
@ -20,7 +21,8 @@ data class PlaybackUpdateMessage(
val generationTime: Long,
val time: Double,
val duration: Double,
val state: Int
val state: Int,
val speed: Double
)
@Serializable
@ -29,6 +31,16 @@ data class VolumeUpdateMessage(
val volume: Double
)
@Serializable
data class PlaybackErrorMessage(
val message: String
)
@Serializable
data class SetSpeedMessage(
val speed: Double
)
@Serializable
data class SetVolumeMessage(
val volume: Double

View file

@ -39,6 +39,7 @@ class PlayerActivity : AppCompatActivity() {
private var _wasPlaying = false
val currentPosition get() = _exoPlayer.currentPosition
val speed get() = _exoPlayer.playbackParameters.speed
val duration get() = _exoPlayer.duration
val isPlaying get() = _exoPlayer.isPlaying
@ -82,7 +83,15 @@ class PlayerActivity : AppCompatActivity() {
setStatus(true, null)
}
//TODO: Send playback update
NetworkService.instance?.generateUpdateMessage()?.let {
_scope.launch(Dispatchers.IO) {
try {
NetworkService.instance?.sendPlaybackUpdate(it)
} catch (e: Throwable) {
Log.e(TAG, "Unhandled error sending playback update", e)
}
}
}
}
override fun onPlayerError(error: PlaybackException) {
@ -105,9 +114,16 @@ class PlayerActivity : AppCompatActivity() {
}
}
//TODO: Send error notification
val fullMessage = getFullExceptionMessage(error)
setStatus(false, fullMessage)
setStatus(false, getFullExceptionMessage(error))
_scope.launch(Dispatchers.IO) {
try {
NetworkService.instance?.sendPlaybackError(fullMessage)
} catch (e: Throwable) {
Log.e(TAG, "Unhandled error sending playback error", e)
}
}
}
override fun onVolumeChanged(volume: Float) {
@ -118,8 +134,19 @@ class PlayerActivity : AppCompatActivity() {
} catch (e: Throwable) {
Log.e(TAG, "Unhandled error sending volume update", e)
}
}
}
Log.i(TAG, "Update sent")
override fun onPlaybackParametersChanged(playbackParameters: PlaybackParameters) {
super.onPlaybackParametersChanged(playbackParameters)
NetworkService.instance?.generateUpdateMessage()?.let {
_scope.launch(Dispatchers.IO) {
try {
NetworkService.instance?.sendPlaybackUpdate(it)
} catch (e: Throwable) {
Log.e(TAG, "Unhandled error sending playback update", e)
}
}
}
}
}
@ -142,7 +169,7 @@ class PlayerActivity : AppCompatActivity() {
val trackSelector = DefaultTrackSelector(this)
trackSelector.parameters = trackSelector.parameters
.buildUpon()
.setPreferredTextLanguage("en")
.setPreferredTextLanguage("df")
.setSelectUndeterminedTextLanguage(true)
.build()
@ -165,9 +192,10 @@ class PlayerActivity : AppCompatActivity() {
val container = intent.getStringExtra("container") ?: ""
val url = intent.getStringExtra("url")
val content = intent.getStringExtra("content")
val time = intent.getLongExtra("time", 0L)
val time = intent.getDoubleExtra("time", 0.0)
val speed = intent.getDoubleExtra("speed", 1.0)
play(PlayMessage(container, url, content, time))
play(PlayMessage(container, url, content, time, speed))
instance = this
NetworkService.activityCount++
@ -302,9 +330,10 @@ class PlayerActivity : AppCompatActivity() {
}
_exoPlayer.setMediaSource(mediaSource)
_exoPlayer.setPlaybackSpeed(playMessage.speed?.toFloat() ?: 1.0f)
if (playMessage.time != null) {
_exoPlayer.seekTo(playMessage.time * 1000)
_exoPlayer.seekTo((playMessage.time * 1000).toLong())
}
setStatus(true, null)
@ -326,6 +355,10 @@ class PlayerActivity : AppCompatActivity() {
_exoPlayer.seekTo((seekMessage.time * 1000.0).toLong())
}
fun setSpeed(setSpeedMessage: SetSpeedMessage) {
_exoPlayer.setPlaybackSpeed(setSpeedMessage.speed.toFloat())
}
fun setVolume(setVolumeMessage: SetVolumeMessage) {
_exoPlayer.volume = setVolumeMessage.volume.toFloat()
}

View file

@ -1,11 +1,6 @@
package com.futo.fcast.receiver
import android.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import java.io.BufferedInputStream
import java.net.ServerSocket
import java.net.Socket

View file

@ -6,44 +6,54 @@ import org.java_websocket.handshake.ClientHandshake
import org.java_websocket.server.WebSocketServer
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.IdentityHashMap
class WebSocketListenerService(private val _networkService: NetworkService, private val _onNewSession: (session: FCastSession) -> Unit) : WebSocketServer(InetSocketAddress(PORT)) {
private var _sessions = IdentityHashMap<WebSocket, FCastSession>()
private val _sockets = arrayListOf<WebSocket>()
override fun onOpen(conn: WebSocket, handshake: ClientHandshake) {
val session = FCastSession(WebSocketOutputStream(conn), conn.remoteSocketAddress, _networkService)
synchronized(_sessions) {
_sessions[conn] = session
conn.setAttachment(session)
synchronized(_sockets) {
_sockets.add(conn)
}
_onNewSession(session)
Log.i(TAG, "New connection from ${conn.remoteSocketAddress}")
Log.i(TAG, "New connection from ${conn.remoteSocketAddress} ${session.id}")
}
override fun onClose(conn: WebSocket, code: Int, reason: String, remote: Boolean) {
synchronized(_sessions) {
_sessions.remove(conn)
synchronized(_sockets) {
_sockets.remove(conn)
}
Log.i(TAG, "Closed connection from ${conn.remoteSocketAddress}")
Log.i(TAG, "Closed connection from ${conn.remoteSocketAddress} ${conn.getAttachment<FCastSession>().id}")
}
override fun onMessage(conn: WebSocket?, message: String?) {
if (conn == null) {
Log.i(TAG, "Conn is null, ignore onMessage")
return
}
Log.i(TAG, "Received string message, but not processing: $message")
}
override fun onMessage(conn: WebSocket?, message: ByteBuffer?) {
if (conn == null) {
Log.i(TAG, "Conn is null, ignore onMessage")
return
}
if (message == null) {
Log.i(TAG, "Received byte message null")
return
}
Log.i(TAG, "Received byte message (offset = ${message.arrayOffset()}, size = ${message.remaining()})")
synchronized(_sessions) {
_sessions[conn]?.processBytes(message)
}
val session = conn.getAttachment<FCastSession>()
Log.i(TAG, "Received byte message (offset = ${message.arrayOffset()}, size = ${message.remaining()}, id = ${session.id})")
session.processBytes(message)
}
override fun onError(conn: WebSocket?, ex: Exception) {
@ -55,9 +65,9 @@ class WebSocketListenerService(private val _networkService: NetworkService, priv
}
fun forEachSession(handler: (FCastSession) -> Unit) {
synchronized(_sessions) {
for (pair in _sessions) {
handler(pair.value)
synchronized(_sockets) {
_sockets.forEach {
handler(it.getAttachment())
}
}
}

View file

@ -1,6 +1,6 @@
import net = require('net');
import { EventEmitter } from 'node:events';
import { PlaybackUpdateMessage, PlayMessage, SeekMessage, SetVolumeMessage, VolumeUpdateMessage } from './Packets';
import { PlaybackErrorMessage, PlaybackUpdateMessage, PlayMessage, SeekMessage, SetSpeedMessage, SetVolumeMessage, VolumeUpdateMessage } from './Packets';
import { WebSocket } from 'ws';
enum SessionState {
@ -19,7 +19,9 @@ enum Opcode {
Seek = 5,
PlaybackUpdate = 6,
VolumeUpdate = 7,
SetVolume = 8
SetVolume = 8,
PlaybackError = 9,
SetSpeed = 10
};
const LENGTH_BYTES = 4;
@ -40,6 +42,10 @@ export class FCastSession {
this.state = SessionState.WaitingForLength;
}
sendPlaybackError(value: PlaybackErrorMessage) {
this.send(Opcode.PlaybackError, value);
}
sendPlaybackUpdate(value: PlaybackUpdateMessage) {
this.send(Opcode.PlaybackUpdate, value);
}
@ -178,6 +184,9 @@ export class FCastSession {
case Opcode.SetVolume:
this.emitter.emit("setvolume", JSON.parse(body) as SetVolumeMessage);
break;
case Opcode.SetSpeed:
this.emitter.emit("setspeed", JSON.parse(body) as SetSpeedMessage);
break;
}
} catch (e) {
console.warn(`Error handling packet from.`, e);

View file

@ -1,7 +1,7 @@
import { BrowserWindow, ipcMain, IpcMainEvent, nativeImage, Tray, Menu, dialog } from 'electron';
import path = require('path');
import { TcpListenerService } from './TcpListenerService';
import { PlaybackUpdateMessage, VolumeUpdateMessage } from './Packets';
import { PlaybackErrorMessage, PlaybackUpdateMessage, VolumeUpdateMessage } from './Packets';
import { DiscoveryService } from './DiscoveryService';
import { Updater } from './Updater';
import { WebSocketListenerService } from './WebSocketListenerService';
@ -137,8 +137,13 @@ export default class Main {
l.emitter.on("seek", (message) => Main.playerWindow?.webContents?.send("seek", message));
l.emitter.on("setvolume", (message) => Main.playerWindow?.webContents?.send("setvolume", message));
l.emitter.on("setspeed", (message) => Main.playerWindow?.webContents?.send("setspeed", message));
l.start();
ipcMain.on('send-playback-error', (event: IpcMainEvent, value: PlaybackErrorMessage) => {
l.sendPlaybackError(value);
});
ipcMain.on('send-playback-update', (event: IpcMainEvent, value: PlaybackUpdateMessage) => {
l.sendPlaybackUpdate(value);
});
@ -198,6 +203,8 @@ export default class Main {
Main.mainWindow = new BrowserWindow({
fullscreen: true,
autoHideMenuBar: true,
minWidth: 500,
minHeight: 920,
webPreferences: {
preload: path.join(__dirname, 'main/preload.js')
}

View file

@ -3,7 +3,8 @@ export class PlayMessage {
public container: String,
public url: String = null,
public content: String = null,
public time: number = null
public time: number = null,
public speed: number = null
) {}
}
@ -18,7 +19,14 @@ export class PlaybackUpdateMessage {
public generationTime: number,
public time: number,
public duration: number,
public state: number
public state: number,
public speed: number
) {}
}
export class PlaybackErrorMessage {
constructor(
public message: String
) {}
}
@ -34,3 +42,9 @@ export class SetVolumeMessage {
public volume: number,
) {}
}
export class SetSpeedMessage {
constructor(
public speed: number,
) {}
}

View file

@ -1,7 +1,7 @@
import net = require('net');
import { FCastSession } from './FCastSession';
import { EventEmitter } from 'node:events';
import { PlaybackUpdateMessage, PlayMessage, SeekMessage, SetVolumeMessage, VolumeUpdateMessage } from './Packets';
import { PlaybackErrorMessage, PlaybackUpdateMessage, PlayMessage, SeekMessage, SetSpeedMessage, SetVolumeMessage, VolumeUpdateMessage } from './Packets';
import { dialog } from 'electron';
import Main from './Main';
@ -33,6 +33,19 @@ export class TcpListenerService {
server.close();
}
sendPlaybackError(value: PlaybackErrorMessage) {
console.info("Sending playback error.", value);
this.sessions.forEach(session => {
try {
session.sendPlaybackError(value);
} catch (e) {
console.warn("Failed to send error.", e);
session.close();
}
});
}
sendPlaybackUpdate(value: PlaybackUpdateMessage) {
console.info("Sending playback update.", value);
@ -89,6 +102,7 @@ export class TcpListenerService {
session.emitter.on("stop", () => { this.emitter.emit("stop") });
session.emitter.on("seek", (body: SeekMessage) => { this.emitter.emit("seek", body) });
session.emitter.on("setvolume", (body: SetVolumeMessage) => { this.emitter.emit("setvolume", body) });
session.emitter.on("setspeed", (body: SetSpeedMessage) => { this.emitter.emit("setspeed", body) });
this.sessions.push(session);
socket.on("error", (err) => {

View file

@ -1,7 +1,7 @@
import net = require('net');
import { FCastSession } from './FCastSession';
import { EventEmitter } from 'node:events';
import { PlaybackUpdateMessage, PlayMessage, SeekMessage, SetVolumeMessage, VolumeUpdateMessage } from './Packets';
import { PlaybackErrorMessage, PlaybackUpdateMessage, PlayMessage, SeekMessage, SetSpeedMessage, SetVolumeMessage, VolumeUpdateMessage } from './Packets';
import { dialog } from 'electron';
import Main from './Main';
import { WebSocket, WebSocketServer } from 'ws';
@ -33,6 +33,19 @@ export class WebSocketListenerService {
server.close();
}
sendPlaybackError(value: PlaybackErrorMessage) {
console.info("Sending playback error.", value);
this.sessions.forEach(session => {
try {
session.sendPlaybackError(value);
} catch (e) {
console.warn("Failed to send error.", e);
session.close();
}
});
}
sendPlaybackUpdate(value: PlaybackUpdateMessage) {
console.info("Sending playback update.", value);
@ -89,6 +102,7 @@ export class WebSocketListenerService {
session.emitter.on("stop", () => { this.emitter.emit("stop") });
session.emitter.on("seek", (body: SeekMessage) => { this.emitter.emit("seek", body) });
session.emitter.on("setvolume", (body: SetVolumeMessage) => { this.emitter.emit("setvolume", body) });
session.emitter.on("setspeed", (body: SetSpeedMessage) => { this.emitter.emit("setspeed", body) });
this.sessions.push(session);
socket.on("error", (err) => {

View file

@ -26,7 +26,7 @@
</div>
<div id="automatic-discovery">Automatic discovery is available via mDNS</div>
<div id="qr-code"></div>
<div id="scan-to-connect">Scan to connect</div>
<div id="scan-to-connect" style="font-weight: bold;">Scan to connect</div>
</div>
<!--<div id="update-dialog">There is an update available. Do you wish to update?</div>
<div id="update-button">Update</div>
@ -34,6 +34,8 @@
<div id="update-spinner" class="lds-ring"><div></div><div></div><div></div><div></div></div>
<div id="progress-text"></div>
</div>-->
<div id="window-can-be-closed" style="color: #666666; position: absolute; bottom: 0; margin-bottom: 20px;">App will continue to run as tray app when the window is closed</div>
</div>
</div>
<script>window.HELP_IMPROVE_VIDEOJS = false;</script>

View file

@ -48,10 +48,14 @@ body, html {
background-color: white;
}
#update-dialog, #waiting-for-connection, #manual-connection-info, #ips, #automatic-discovery, #scan-to-connect {
#update-dialog, #waiting-for-connection, #ips, #automatic-discovery, #scan-to-connect {
margin-top: 20px;
}
#spinner {
padding: 20px;
}
#update-button {
background: blue;
padding: 10px 28px;

View file

@ -3,11 +3,13 @@ const { contextBridge, ipcRenderer } = require('electron');
contextBridge.exposeInMainWorld('electronAPI', {
toggleFullScreen: () => ipcRenderer.send('toggle-full-screen'),
exitFullScreen: () => ipcRenderer.send('exit-full-screen'),
sendPlaybackError: (error) => ipcRenderer.send('send-playback-error', error),
sendPlaybackUpdate: (update) => ipcRenderer.send('send-playback-update', update),
sendVolumeUpdate: (update) => ipcRenderer.send('send-volume-update', update),
onPlay: (callback) => ipcRenderer.on("play", callback),
onPause: (callback) => ipcRenderer.on("pause", callback),
onResume: (callback) => ipcRenderer.on("resume", callback),
onSeek: (callback) => ipcRenderer.on("seek", callback),
onSetVolume: (callback) => ipcRenderer.on("setvolume", callback)
onSetVolume: (callback) => ipcRenderer.on("setvolume", callback),
onSetSpeed: (callback) => ipcRenderer.on("setspeed", callback)
});

View file

@ -17,30 +17,45 @@ const player = videojs("video-player", options, function onPlayerReady() {
player.on("pause", () => { window.electronAPI.sendPlaybackUpdate({
generationTime: Date.now(),
time: Math.round(player.currentTime()),
duration: Math.round(player.duration()),
state: 2
time: player.currentTime(),
duration: player.duration(),
state: 2,
speed: player.playbackRate()
})});
player.on("play", () => { window.electronAPI.sendPlaybackUpdate({
generationTime: Date.now(),
time: Math.round(player.currentTime()),
duration: Math.round(player.duration()),
state: 1
time: player.currentTime(),
duration: player.duration(),
state: 1,
speed: player.playbackRate()
})});
player.on("seeked", () => { window.electronAPI.sendPlaybackUpdate({
generationTime: Date.now(),
time: Math.round(player.currentTime()),
duration: Math.round(player.duration()),
state: player.paused() ? 2 : 1 })
});
time: player.currentTime(),
duration: player.duration(),
state: player.paused() ? 2 : 1,
speed: player.playbackRate()
})});
player.on("volumechange", () => { window.electronAPI.sendVolumeUpdate({
generationTime: Date.now(),
volume: player.volume()
})});
player.on("ratechange", () => { window.electronAPI.sendPlaybackUpdate({
generationTime: Date.now(),
time: player.currentTime(),
duration: player.duration(),
state: player.paused() ? 2 : 1,
speed: player.playbackRate()
})});
player.on('error', () => { window.electronAPI.sendPlaybackError({
message: JSON.stringify(player.error())
})});
window.electronAPI.onPlay((_event, value) => {
console.log("Handle play message renderer", value);
@ -50,11 +65,22 @@ window.electronAPI.onPlay((_event, value) => {
player.src({ type: value.container, src: value.url });
}
player.play();
const onLoadedMetadata = () => {
if (value.time) {
player.currentTime(value.time);
}
if (value.time) {
player.currentTime(value.time);
}
if (value.speed) {
player.playbackRate(value.speed);
} else {
player.playbackRate(1.0);
}
player.off('loadedmetadata', onLoadedMetadata);
};
player.on('loadedmetadata', onLoadedMetadata);
player.play();
});
window.electronAPI.onPause((_event) => {
@ -77,12 +103,18 @@ window.electronAPI.onSetVolume((_event, value) => {
player.volume(Math.min(1.0, Math.max(0.0, value.volume)));
});
window.electronAPI.onSetSpeed((_event, value) => {
console.log("Handle setSpeed");
player.playbackRate(value.speed);
});
setInterval(() => {
window.electronAPI.sendPlaybackUpdate({
generationTime: Date.now(),
time: Math.round(player.currentTime()),
duration: Math.round(player.duration()),
state: player.paused() ? 2 : 1
time: (player.currentTime()),
duration: (player.duration()),
state: player.paused() ? 2 : 1,
speed: player.playbackRate()
});
}, 1000);
@ -171,7 +203,7 @@ player.ready(() => {
textTracks.addEventListener("change", function () {
console.log("Text tracks changed", textTracks);
for (let i = 0; i < textTracks.length; i++) {
if (textTracks[i].language === "en" && textTracks[i].mode !== "showing") {
if (textTracks[i].language === "df" && textTracks[i].mode !== "showing") {
textTracks[i].mode = "showing";
}
}
@ -180,7 +212,7 @@ player.ready(() => {
player.on('loadedmetadata', function () {
console.log("Metadata loaded", textTracks);
for (let i = 0; i < textTracks.length; i++) {
if (textTracks[i].language === "en" && textTracks[i].mode !== "showing") {
if (textTracks[i].language === "df" && textTracks[i].mode !== "showing") {
textTracks[i].mode = "showing";
}
}