From 42d17e2fe7a4ab18019a0b3383d2d979822273e1 Mon Sep 17 00:00:00 2001 From: Michael Hollister Date: Fri, 2 May 2025 15:42:44 -0500 Subject: [PATCH] Receivers: Suppress excessive toasts from quick successive sender connect/disconnect/reconnect events --- receivers/common/web/ConnectionMonitor.ts | 99 ++++++++++++++++--- receivers/common/web/TcpListenerService.ts | 17 ++-- .../common/web/WebSocketListenerService.ts | 16 +-- receivers/common/web/main/Preload.ts | 2 +- receivers/electron/src/Main.ts | 36 +++---- 5 files changed, 114 insertions(+), 56 deletions(-) diff --git a/receivers/common/web/ConnectionMonitor.ts b/receivers/common/web/ConnectionMonitor.ts index 20c9b6c..68430cf 100644 --- a/receivers/common/web/ConnectionMonitor.ts +++ b/receivers/common/web/ConnectionMonitor.ts @@ -7,14 +7,20 @@ export function setUiUpdateCallbacks(callbacks: any) { let frontendConnections = []; window.targetAPI.onConnect((_event, value: any) => { - frontendConnections.push(value.sessionId); + const idMapping = value.type === 'ws' ? value.sessionId : value.data.address; + + logger.debug(`Processing connect event for ${idMapping} with current connections:`, frontendConnections); + frontendConnections.push(idMapping); callbacks.onConnect(frontendConnections); }); window.targetAPI.onDisconnect((_event, value: any) => { - const index = frontendConnections.indexOf(value.sessionId); + const idMapping = value.type === 'ws' ? value.sessionId : value.data.address; + + logger.debug(`Processing disconnect event for ${idMapping} with current connections:`, frontendConnections); + const index = frontendConnections.indexOf(idMapping); if (index != -1) { frontendConnections.splice(index, 1); - callbacks.onDisconnect(frontendConnections, value.sessionId); + callbacks.onDisconnect(frontendConnections); } }); @@ -29,11 +35,14 @@ export function setUiUpdateCallbacks(callbacks: any) { } export class ConnectionMonitor { + private static logger: Logger; private static initialized = false; private static connectionPingTimeout = 2500; private static heartbeatRetries = new Map(); private static backendConnections = new Map(); - private static logger; + private static uiConnectUpdateTimeout = 100; + private static uiDisconnectUpdateTimeout = 2000; // Senders may reconnect, but generally need more time + private static uiUpdateMap = new Map(); constructor() { if (!ConnectionMonitor.initialized) { @@ -49,7 +58,7 @@ export class ConnectionMonitor { ConnectionMonitor.logger.debug(`Pinging session ${sessionId} with ${ConnectionMonitor.heartbeatRetries.get(sessionId)} retries left`); ConnectionMonitor.backendConnections.get(sessionId).send(Opcode.Ping, null, sessionId); - ConnectionMonitor.heartbeatRetries.set(sessionId, ConnectionMonitor.heartbeatRetries.get(sessionId) === undefined ? 1 : ConnectionMonitor.heartbeatRetries.get(sessionId) + 1); + ConnectionMonitor.heartbeatRetries.set(sessionId, ConnectionMonitor.heartbeatRetries.get(sessionId) + 1); } } }, ConnectionMonitor.connectionPingTimeout); @@ -58,18 +67,86 @@ export class ConnectionMonitor { } } - public static onPingPong(value: any) { + public static onPingPong(value: any, isWebsockets: boolean) { ConnectionMonitor.logger.debug(`Received response from ${value.sessionId}`); - ConnectionMonitor.heartbeatRetries.set(value.sessionId, 0); + + // Websocket clients currently don't support ping-pong commands + if (!isWebsockets) { + ConnectionMonitor.heartbeatRetries.set(value.sessionId, 0); + } } - public static onConnect(listener: any, value: any) { + public static onConnect(listener: any, value: any, isWebsockets: boolean, uiUpdateCallback: any) { ConnectionMonitor.logger.info(`Device connected: ${JSON.stringify(value)}`); - ConnectionMonitor.backendConnections.set(value.sessionId, listener); + const idMapping = isWebsockets ? value.sessionId : value.data.address; + + if (!ConnectionMonitor.uiUpdateMap.has(idMapping)) { + ConnectionMonitor.uiUpdateMap.set(idMapping, []); + } + + if (!isWebsockets) { + ConnectionMonitor.backendConnections.set(value.sessionId, listener); + ConnectionMonitor.heartbeatRetries.set(value.sessionId, 0); + } + + // Occasionally senders seem to instantaneously disconnect and reconnect, so suppress those ui updates + const senderUpdateQueue = ConnectionMonitor.uiUpdateMap.get(idMapping); + senderUpdateQueue.push({ event: 'connect', uiUpdateCallback: uiUpdateCallback }); + ConnectionMonitor.uiUpdateMap.set(idMapping, senderUpdateQueue); + + if (senderUpdateQueue.length === 1) { + setTimeout(() => { ConnectionMonitor.processUiUpdateCallbacks(idMapping); }, ConnectionMonitor.uiConnectUpdateTimeout); + } } - public static onDisconnect(listener: any, value: any) { + public static onDisconnect(listener: any, value: any, isWebsockets: boolean, uiUpdateCallback: any) { ConnectionMonitor.logger.info(`Device disconnected: ${JSON.stringify(value)}`); - ConnectionMonitor.backendConnections.delete(value.sessionId); + + if (!isWebsockets) { + ConnectionMonitor.backendConnections.delete(value.sessionId); + ConnectionMonitor.heartbeatRetries.delete(value.sessionId); + } + + const idMapping = isWebsockets ? value.sessionId : value.data.address; + const senderUpdateQueue = ConnectionMonitor.uiUpdateMap.get(idMapping); + senderUpdateQueue.push({ event: 'disconnect', uiUpdateCallback: uiUpdateCallback }); + ConnectionMonitor.uiUpdateMap.set(idMapping, senderUpdateQueue); + + if (senderUpdateQueue.length === 1) { + setTimeout(() => { ConnectionMonitor.processUiUpdateCallbacks(idMapping); }, ConnectionMonitor.uiDisconnectUpdateTimeout); + } + } + + private static processUiUpdateCallbacks(mapId: string) { + const updateQueue = ConnectionMonitor.uiUpdateMap.get(mapId); + let lastConnectCb: any; + let lastDisconnectCb: any; + let messageCount = 0; + + updateQueue.forEach(update => { + ConnectionMonitor.logger.debug(`Processing update event '${update.event}' for ${mapId}`); + if (update.event === 'connect') { + messageCount += 1; + lastConnectCb = update.uiUpdateCallback; + } + else if (update.event === 'disconnect') { + messageCount -= 1; + lastDisconnectCb = update.uiUpdateCallback; + } + else { + ConnectionMonitor.logger.warn('Unrecognized UI update event:', update.event) + } + }); + + if (messageCount > 0) { + ConnectionMonitor.logger.debug(`Sending connect event for ${mapId}`); + lastConnectCb(); + } + else if (messageCount < 0) { + ConnectionMonitor.logger.debug(`Sending disconnect event for ${mapId}`); + lastDisconnectCb(); + } + + ConnectionMonitor.uiUpdateMap.set(mapId, []); } } diff --git a/receivers/common/web/TcpListenerService.ts b/receivers/common/web/TcpListenerService.ts index 1c0e32d..268c7ce 100644 --- a/receivers/common/web/TcpListenerService.ts +++ b/receivers/common/web/TcpListenerService.ts @@ -11,7 +11,6 @@ export class TcpListenerService { emitter = new EventEmitter(); private server: net.Server; - private sessions: FCastSession[] = []; private sessionMap = new Map(); start() { @@ -48,14 +47,14 @@ export class TcpListenerService { } } else { - this.sessions.forEach(session => { + for (const session of this.sessionMap.values()) { try { session.send(opcode, message); } catch (e) { logger.warn("Failed to send error.", e); session.close(); } - }); + } } } @@ -64,6 +63,12 @@ export class TcpListenerService { this.sessionMap.delete(sessionId); } + public getSenders(): string[] { + const senders = []; + this.sessionMap.forEach((sender) => { senders.push(sender.socket.remoteAddress); }); + return senders; + } + public getSessions(): string[] { return [...this.sessionMap.keys()]; } @@ -77,7 +82,6 @@ export class TcpListenerService { const session = new FCastSession(socket, (data) => socket.write(data)); session.bindEvents(this.emitter); - this.sessions.push(session); this.sessionMap.set(session.sessionId, session); socket.on("error", (err) => { @@ -95,11 +99,6 @@ export class TcpListenerService { }); socket.on("close", () => { - const index = this.sessions.indexOf(session); - if (index != -1) { - this.sessions.splice(index, 1); - } - this.sessionMap.delete(session.sessionId); this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }}); }); diff --git a/receivers/common/web/WebSocketListenerService.ts b/receivers/common/web/WebSocketListenerService.ts index 60f0d2c..4be5118 100644 --- a/receivers/common/web/WebSocketListenerService.ts +++ b/receivers/common/web/WebSocketListenerService.ts @@ -12,7 +12,6 @@ export class WebSocketListenerService { emitter = new EventEmitter(); private server: WebSocketServer; - private sessions: FCastSession[] = []; private sessionMap = new Map(); start() { @@ -46,20 +45,19 @@ export class WebSocketListenerService { } } else { - this.sessions.forEach(session => { + for (const session of this.sessionMap.values()) { try { session.send(opcode, message); } catch (e) { logger.warn("Failed to send error.", e); session.close(); } - }); + } } } disconnect(sessionId: string) { this.sessionMap.get(sessionId)?.close(); - this.sessionMap.delete(sessionId); } public getSessions(): string[] { @@ -70,12 +68,11 @@ export class WebSocketListenerService { errorHandler(err); } - private handleConnection(socket: WebSocket) { + private handleConnection(socket: WebSocket, request: any) { logger.info('New WebSocket connection'); const session = new FCastSession(socket, (data) => socket.send(data)); session.bindEvents(this.emitter); - this.sessions.push(session); this.sessionMap.set(session.sessionId, session); socket.on("error", (err) => { @@ -97,13 +94,6 @@ export class WebSocketListenerService { }); socket.on("close", () => { - logger.info('WebSocket connection closed'); - - const index = this.sessions.indexOf(session); - if (index != -1) { - this.sessions.splice(index, 1); - } - this.sessionMap.delete(session.sessionId); this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'ws', data: { url: socket.url }}); }); diff --git a/receivers/common/web/main/Preload.ts b/receivers/common/web/main/Preload.ts index 001f8ed..c05e342 100644 --- a/receivers/common/web/main/Preload.ts +++ b/receivers/common/web/main/Preload.ts @@ -55,7 +55,7 @@ if (TARGET === 'electron') { window.targetAPI = { onDeviceInfo: (callback: () => void) => preloadData.onDeviceInfoCb = callback, getDeviceInfo: () => preloadData.deviceInfo, - getSessions: (callback: () => void) => preloadData.getSessionsCb = callback, + getSessions: (callback: () => any) => preloadData.getSessionsCb = callback, onConnect: (callback: (_, value: any) => void) => preloadData.onConnectCb = callback, onDisconnect: (callback: (_, value: any) => void) => preloadData.onDisconnectCb = callback, logger: loggerInterface, diff --git a/receivers/electron/src/Main.ts b/receivers/electron/src/Main.ts index 9119c06..68ec8d0 100644 --- a/receivers/electron/src/Main.ts +++ b/receivers/electron/src/Main.ts @@ -193,31 +193,22 @@ export class Main { l.emitter.on("setspeed", (message) => Main.playerWindow?.webContents?.send("setspeed", message)); l.emitter.on('connect', (message) => { - // Websocket clients currently don't have ping-pong commands supported - if (l instanceof TcpListenerService) { - ConnectionMonitor.onConnect(l, message); - } - - Main.mainWindow?.webContents?.send('connect', message); - Main.playerWindow?.webContents?.send('connect', message); + ConnectionMonitor.onConnect(l, message, l instanceof WebSocketListenerService, () => { + Main.mainWindow?.webContents?.send('connect', message); + Main.playerWindow?.webContents?.send('connect', message); + }); }); l.emitter.on('disconnect', (message) => { - if (l instanceof TcpListenerService) { - ConnectionMonitor.onDisconnect(l, message); - } - - Main.mainWindow?.webContents?.send('disconnect', message); - Main.playerWindow?.webContents?.send('disconnect', message); + ConnectionMonitor.onDisconnect(l, message, l instanceof WebSocketListenerService, () => { + Main.mainWindow?.webContents?.send('disconnect', message); + Main.playerWindow?.webContents?.send('disconnect', message); + }); }); l.emitter.on('ping', (message) => { - if (l instanceof TcpListenerService) { - ConnectionMonitor.onPingPong(message); - } + ConnectionMonitor.onPingPong(message, l instanceof WebSocketListenerService); }); l.emitter.on('pong', (message) => { - if (l instanceof TcpListenerService) { - ConnectionMonitor.onPingPong(message); - } + ConnectionMonitor.onPingPong(message, l instanceof WebSocketListenerService); }); l.start(); @@ -287,8 +278,9 @@ export class Main { window.setFullScreen(false); }); + // Having to mix and match session ids and ip addresses until querying websocket remote addresses is fixed ipcMain.handle('get-sessions', () => { - return [].concat(Main.tcpListenerService.getSessions(), Main.webSocketListenerService.getSessions()); + return [].concat(Main.tcpListenerService.getSenders(), Main.webSocketListenerService.getSessions()); }); // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -477,8 +469,8 @@ export async function errorHandler(error: Error) { const restartPrompt = await dialog.showMessageBox({ type: 'error', - title: 'Failed to start', - message: `The application failed to start properly:\n\n${error.stack}}`, + title: 'Application Error', + message: `The application encountered an error:\n\n${error.stack}}`, buttons: ['Restart', 'Close'], defaultId: 0, cancelId: 1