1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-06-24 21:25:23 +00:00

Receivers: Fixed logic issues with connection monitor

This commit is contained in:
Michael Hollister 2025-04-29 11:57:25 -05:00
parent 270a998e18
commit 9b47627801
9 changed files with 39 additions and 57 deletions

View file

@ -13,26 +13,30 @@ export function setUiUpdateCallbacks(callbacks: any) {
}
// Window might be re-created while devices are still connected
window.targetAPI.onPing((_event, value: any) => {
function onPingPong(value: any) {
if (value) {
heartbeatRetries[value.id] = 0;
heartbeatRetries[value.sessionId] = 0;
if (!connections.includes(value.id)) {
connections.push(value.id);
uiUpdateCallbacks.onConnect(connections, value.id);
if (!connections.includes(value.sessionId)) {
connections.push(value.sessionId);
uiUpdateCallbacks.onConnect(connections, value.sessionId);
}
}
});
}
window.targetAPI.onPing((_event, value: any) => onPingPong(value));
window.targetAPI.onPong((_event, value: any) => onPingPong(value));
window.targetAPI.onConnect((_event, value: any) => {
connections.push(value.id);
console.log(`Device connected: ${JSON.stringify(value)}`);
connections.push(value.sessionId);
uiUpdateCallbacks.onConnect(connections, value);
});
window.targetAPI.onDisconnect((_event, value: any) => {
console.log(`Device disconnected: ${JSON.stringify(value)}`);
const index = connections.indexOf(value.id);
const index = connections.indexOf(value.sessionId);
if (index != -1) {
connections.splice(index, 1);
uiUpdateCallbacks.onDisconnect(connections, value.id);
uiUpdateCallbacks.onDisconnect(connections, value.sessionId);
}
});
@ -40,13 +44,13 @@ setInterval(() => {
if (connections.length > 0) {
window.targetAPI.sendSessionMessage(Opcode.Ping, null);
for (const session of connections) {
if (heartbeatRetries[session] > 3) {
console.warn(`Could not ping device with connection id ${session}. Disconnecting...`);
window.targetAPI.disconnectDevice(session);
for (const sessionId of connections) {
if (heartbeatRetries[sessionId] > 3) {
console.warn(`Could not ping device with connection id ${sessionId}. Disconnecting...`);
window.targetAPI.disconnectDevice(sessionId);
}
heartbeatRetries[session] = heartbeatRetries[session] === undefined ? 1 : heartbeatRetries[session] + 1;
heartbeatRetries[sessionId] = heartbeatRetries[sessionId] === undefined ? 1 : heartbeatRetries[sessionId] + 1;
}
}
}, connectionPingTimeout);

View file

@ -3,6 +3,7 @@ import * as log4js from "modules/log4js";
import { EventEmitter } from 'events';
import { Opcode, PlaybackErrorMessage, PlaybackUpdateMessage, PlayMessage, SeekMessage, SetSpeedMessage, SetVolumeMessage, VersionMessage, VolumeUpdateMessage } from 'common/Packets';
import { WebSocket } from 'modules/ws';
import { v4 as uuidv4 } from 'modules/uuid';
const logger = log4js.getLogger();
enum SessionState {
@ -16,6 +17,7 @@ const LENGTH_BYTES = 4;
const MAXIMUM_PACKET_LENGTH = 32000;
export class FCastSession {
public sessionId: string;
buffer: Buffer = Buffer.alloc(MAXIMUM_PACKET_LENGTH);
bytesRead = 0;
packetLength = 0;
@ -25,6 +27,7 @@ export class FCastSession {
emitter = new EventEmitter();
constructor(socket: net.Socket | WebSocket, writer: (data: Buffer) => void) {
this.sessionId = uuidv4();
this.socket = socket;
this.writer = writer;
this.state = SessionState.WaitingForLength;
@ -220,7 +223,7 @@ export class FCastSession {
this.emitter.on("setvolume", (body: SetVolumeMessage) => { emitter.emit("setvolume", body) });
this.emitter.on("setspeed", (body: SetSpeedMessage) => { emitter.emit("setspeed", body) });
this.emitter.on("version", (body: VersionMessage) => { emitter.emit("version", body) });
this.emitter.on("ping", () => { emitter.emit("ping") });
this.emitter.on("pong", () => { emitter.emit("pong") });
this.emitter.on("ping", () => { emitter.emit("ping", { sessionId: this.sessionId }) });
this.emitter.on("pong", () => { emitter.emit("pong", { sessionId: this.sessionId }) });
}
}

View file

@ -3,7 +3,6 @@ import { FCastSession } from 'common/FCastSession';
import { Opcode } from 'common/Packets';
import { EventEmitter } from 'events';
import { Main, errorHandler } from 'src/Main';
import { v4 as uuidv4 } from 'modules/uuid';
export class TcpListenerService {
public static PORT = 46899;
@ -47,8 +46,8 @@ export class TcpListenerService {
});
}
disconnect(connectionId: string) {
this.sessionMap[connectionId]?.socket.destroy();
disconnect(sessionId: string) {
this.sessionMap[sessionId]?.socket.destroy();
}
private async handleServerError(err: NodeJS.ErrnoException) {
@ -61,9 +60,7 @@ export class TcpListenerService {
const session = new FCastSession(socket, (data) => socket.write(data));
session.bindEvents(this.emitter);
this.sessions.push(session);
const connectionId = uuidv4();
this.sessionMap[connectionId] = session;
this.sessionMap[session.sessionId] = session;
socket.on("error", (err) => {
Main.logger.warn(`Error from ${socket.remoteAddress}:${socket.remotePort}.`, err);
@ -84,23 +81,10 @@ export class TcpListenerService {
if (index != -1) {
this.sessions.splice(index, 1);
}
if (!this.sessions.some(e => e.socket.remoteAddress === socket.remoteAddress)) {
this.emitter.emit('disconnect', { id: connectionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }});
}
this.emitter.removeListener('ping', pingListener);
this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }});
});
// Sometimes the sender may reconnect under a different port, so suppress connect/disconnect event emission
if (!this.sessions.some(e => e.socket.remoteAddress === socket.remoteAddress)) {
this.emitter.emit('connect', { id: connectionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }});
}
const pingListener = (message: any) => {
if (!message) {
this.emitter.emit('ping', { id: connectionId });
}
}
this.emitter.prependListener('ping', pingListener);
this.emitter.emit('connect', { sessionId: session.sessionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }});
try {
Main.logger.info('Sending version');
session.send(Opcode.Version, {version: 2});

View file

@ -3,7 +3,6 @@ import { Opcode } from 'common/Packets';
import { EventEmitter } from 'events';
import { WebSocket, WebSocketServer } from 'modules/ws';
import { Main, errorHandler } from 'src/Main';
import { v4 as uuidv4 } from 'modules/uuid';
export class WebSocketListenerService {
public static PORT = 46898;
@ -46,8 +45,8 @@ export class WebSocketListenerService {
});
}
disconnect(connectionId: string) {
this.sessionMap[connectionId]?.close();
disconnect(sessionId: string) {
this.sessionMap[sessionId]?.close();
}
private async handleServerError(err: NodeJS.ErrnoException) {
@ -58,11 +57,9 @@ export class WebSocketListenerService {
Main.logger.info('New WebSocket connection');
const session = new FCastSession(socket, (data) => socket.send(data));
const connectionId = uuidv4();
this.sessionMap[connectionId] = session;
session.bindEvents(this.emitter);
this.sessions.push(session);
this.sessionMap[session.sessionId] = session;
socket.on("error", (err) => {
Main.logger.warn(`Error.`, err);
@ -89,18 +86,10 @@ export class WebSocketListenerService {
if (index != -1) {
this.sessions.splice(index, 1);
}
this.emitter.emit('disconnect', { id: connectionId, type: 'ws', data: { url: socket.url }});
this.emitter.removeListener('ping', pingListener);
this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'ws', data: { url: socket.url }});
});
this.emitter.emit('connect', { id: connectionId, type: 'ws', data: { url: socket.url }});
const pingListener = (message: any) => {
if (!message) {
this.emitter.emit('ping', { id: connectionId });
}
}
this.emitter.prependListener('ping', pingListener);
this.emitter.emit('connect', { sessionId: session.sessionId, type: 'ws', data: { url: socket.url }});
try {
Main.logger.info('Sending version');
session.send(Opcode.Version, {version: 2});

View file

@ -30,6 +30,7 @@ if (TARGET === 'electron') {
onConnect: (callback: any) => electronAPI.ipcRenderer.on('connect', callback),
onDisconnect: (callback: any) => electronAPI.ipcRenderer.on('disconnect', callback),
onPing: (callback: any) => electronAPI.ipcRenderer.on('ping', callback),
onPong: (callback: any) => electronAPI.ipcRenderer.on('pong', callback),
});
// @ts-ignore

View file

@ -16,13 +16,11 @@ window.addEventListener('resize', (event) => calculateQRCodeWidth());
connectionMonitor.setUiUpdateCallbacks({
onConnect: (connections: string[], connectionInfo: any) => {
console.log(`Device connected: ${JSON.stringify(connectionInfo)}`);
connectionStatusText.textContent = connections.length > 1 ? 'Multiple devices connected:\r\n Ready to cast' : 'Connected: Ready to cast';
connectionStatusSpinner.style.display = 'none';
connectionStatusCheck.style.display = 'inline-block';
},
onDisconnect: (connections: string[], connectionInfo: any) => {
console.log(`Device disconnected: ${JSON.stringify(connectionInfo)}`);
if (connections.length === 0) {
connectionStatusText.textContent = 'Waiting for a connection';
connectionStatusSpinner.style.display = 'inline-block';

View file

@ -33,6 +33,7 @@ if (TARGET === 'electron') {
onConnect: (callback: any) => electronAPI.ipcRenderer.on('connect', callback),
onDisconnect: (callback: any) => electronAPI.ipcRenderer.on('disconnect', callback),
onPing: (callback: any) => electronAPI.ipcRenderer.on('ping', callback),
onPong: (callback: any) => electronAPI.ipcRenderer.on('pong', callback),
});
// @ts-ignore

View file

@ -334,11 +334,9 @@ function onPlay(_event, value: PlayMessage) {
connectionMonitor.setUiUpdateCallbacks({
onConnect: (connections: string[], connectionInfo: any) => {
console.log(`Device connected: ${JSON.stringify(connectionInfo)}`);
toast('Device connected', ToastIcon.INFO);
},
onDisconnect: (connections: string[], connectionInfo: any) => {
console.log(`Device disconnected: ${JSON.stringify(connectionInfo)}`);
toast('Device disconnected. If you experience playback issues, please reconnect.', ToastIcon.INFO);
},
});

View file

@ -200,6 +200,10 @@ export class Main {
Main.mainWindow?.webContents?.send('ping', message);
Main.playerWindow?.webContents?.send('ping', message);
});
l.emitter.on('pong', (message) => {
Main.mainWindow?.webContents?.send('pong', message);
Main.playerWindow?.webContents?.send('pong', message);
});
l.start();
ipcMain.on('send-playback-error', (event: IpcMainEvent, value: PlaybackErrorMessage) => {