mirror of
https://gitlab.com/futo-org/fcast.git
synced 2025-06-24 21:25:23 +00:00
Receivers: Suppress excessive toasts from quick successive sender connect/disconnect/reconnect events
This commit is contained in:
parent
dd88edae7d
commit
42d17e2fe7
5 changed files with 114 additions and 56 deletions
|
@ -7,14 +7,20 @@ export function setUiUpdateCallbacks(callbacks: any) {
|
||||||
let frontendConnections = [];
|
let frontendConnections = [];
|
||||||
|
|
||||||
window.targetAPI.onConnect((_event, value: any) => {
|
window.targetAPI.onConnect((_event, value: any) => {
|
||||||
frontendConnections.push(value.sessionId);
|
const idMapping = value.type === 'ws' ? value.sessionId : value.data.address;
|
||||||
|
|
||||||
|
logger.debug(`Processing connect event for ${idMapping} with current connections:`, frontendConnections);
|
||||||
|
frontendConnections.push(idMapping);
|
||||||
callbacks.onConnect(frontendConnections);
|
callbacks.onConnect(frontendConnections);
|
||||||
});
|
});
|
||||||
window.targetAPI.onDisconnect((_event, value: any) => {
|
window.targetAPI.onDisconnect((_event, value: any) => {
|
||||||
const index = frontendConnections.indexOf(value.sessionId);
|
const idMapping = value.type === 'ws' ? value.sessionId : value.data.address;
|
||||||
|
|
||||||
|
logger.debug(`Processing disconnect event for ${idMapping} with current connections:`, frontendConnections);
|
||||||
|
const index = frontendConnections.indexOf(idMapping);
|
||||||
if (index != -1) {
|
if (index != -1) {
|
||||||
frontendConnections.splice(index, 1);
|
frontendConnections.splice(index, 1);
|
||||||
callbacks.onDisconnect(frontendConnections, value.sessionId);
|
callbacks.onDisconnect(frontendConnections);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -29,11 +35,14 @@ export function setUiUpdateCallbacks(callbacks: any) {
|
||||||
}
|
}
|
||||||
|
|
||||||
export class ConnectionMonitor {
|
export class ConnectionMonitor {
|
||||||
|
private static logger: Logger;
|
||||||
private static initialized = false;
|
private static initialized = false;
|
||||||
private static connectionPingTimeout = 2500;
|
private static connectionPingTimeout = 2500;
|
||||||
private static heartbeatRetries = new Map();
|
private static heartbeatRetries = new Map();
|
||||||
private static backendConnections = new Map();
|
private static backendConnections = new Map();
|
||||||
private static logger;
|
private static uiConnectUpdateTimeout = 100;
|
||||||
|
private static uiDisconnectUpdateTimeout = 2000; // Senders may reconnect, but generally need more time
|
||||||
|
private static uiUpdateMap = new Map();
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
if (!ConnectionMonitor.initialized) {
|
if (!ConnectionMonitor.initialized) {
|
||||||
|
@ -49,7 +58,7 @@ export class ConnectionMonitor {
|
||||||
|
|
||||||
ConnectionMonitor.logger.debug(`Pinging session ${sessionId} with ${ConnectionMonitor.heartbeatRetries.get(sessionId)} retries left`);
|
ConnectionMonitor.logger.debug(`Pinging session ${sessionId} with ${ConnectionMonitor.heartbeatRetries.get(sessionId)} retries left`);
|
||||||
ConnectionMonitor.backendConnections.get(sessionId).send(Opcode.Ping, null, sessionId);
|
ConnectionMonitor.backendConnections.get(sessionId).send(Opcode.Ping, null, sessionId);
|
||||||
ConnectionMonitor.heartbeatRetries.set(sessionId, ConnectionMonitor.heartbeatRetries.get(sessionId) === undefined ? 1 : ConnectionMonitor.heartbeatRetries.get(sessionId) + 1);
|
ConnectionMonitor.heartbeatRetries.set(sessionId, ConnectionMonitor.heartbeatRetries.get(sessionId) + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, ConnectionMonitor.connectionPingTimeout);
|
}, ConnectionMonitor.connectionPingTimeout);
|
||||||
|
@ -58,18 +67,86 @@ export class ConnectionMonitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static onPingPong(value: any) {
|
public static onPingPong(value: any, isWebsockets: boolean) {
|
||||||
ConnectionMonitor.logger.debug(`Received response from ${value.sessionId}`);
|
ConnectionMonitor.logger.debug(`Received response from ${value.sessionId}`);
|
||||||
|
|
||||||
|
// Websocket clients currently don't support ping-pong commands
|
||||||
|
if (!isWebsockets) {
|
||||||
|
ConnectionMonitor.heartbeatRetries.set(value.sessionId, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static onConnect(listener: any, value: any, isWebsockets: boolean, uiUpdateCallback: any) {
|
||||||
|
ConnectionMonitor.logger.info(`Device connected: ${JSON.stringify(value)}`);
|
||||||
|
const idMapping = isWebsockets ? value.sessionId : value.data.address;
|
||||||
|
|
||||||
|
if (!ConnectionMonitor.uiUpdateMap.has(idMapping)) {
|
||||||
|
ConnectionMonitor.uiUpdateMap.set(idMapping, []);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isWebsockets) {
|
||||||
|
ConnectionMonitor.backendConnections.set(value.sessionId, listener);
|
||||||
ConnectionMonitor.heartbeatRetries.set(value.sessionId, 0);
|
ConnectionMonitor.heartbeatRetries.set(value.sessionId, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static onConnect(listener: any, value: any) {
|
// Occasionally senders seem to instantaneously disconnect and reconnect, so suppress those ui updates
|
||||||
ConnectionMonitor.logger.info(`Device connected: ${JSON.stringify(value)}`);
|
const senderUpdateQueue = ConnectionMonitor.uiUpdateMap.get(idMapping);
|
||||||
ConnectionMonitor.backendConnections.set(value.sessionId, listener);
|
senderUpdateQueue.push({ event: 'connect', uiUpdateCallback: uiUpdateCallback });
|
||||||
|
ConnectionMonitor.uiUpdateMap.set(idMapping, senderUpdateQueue);
|
||||||
|
|
||||||
|
if (senderUpdateQueue.length === 1) {
|
||||||
|
setTimeout(() => { ConnectionMonitor.processUiUpdateCallbacks(idMapping); }, ConnectionMonitor.uiConnectUpdateTimeout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static onDisconnect(listener: any, value: any) {
|
public static onDisconnect(listener: any, value: any, isWebsockets: boolean, uiUpdateCallback: any) {
|
||||||
ConnectionMonitor.logger.info(`Device disconnected: ${JSON.stringify(value)}`);
|
ConnectionMonitor.logger.info(`Device disconnected: ${JSON.stringify(value)}`);
|
||||||
|
|
||||||
|
if (!isWebsockets) {
|
||||||
ConnectionMonitor.backendConnections.delete(value.sessionId);
|
ConnectionMonitor.backendConnections.delete(value.sessionId);
|
||||||
|
ConnectionMonitor.heartbeatRetries.delete(value.sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
const idMapping = isWebsockets ? value.sessionId : value.data.address;
|
||||||
|
const senderUpdateQueue = ConnectionMonitor.uiUpdateMap.get(idMapping);
|
||||||
|
senderUpdateQueue.push({ event: 'disconnect', uiUpdateCallback: uiUpdateCallback });
|
||||||
|
ConnectionMonitor.uiUpdateMap.set(idMapping, senderUpdateQueue);
|
||||||
|
|
||||||
|
if (senderUpdateQueue.length === 1) {
|
||||||
|
setTimeout(() => { ConnectionMonitor.processUiUpdateCallbacks(idMapping); }, ConnectionMonitor.uiDisconnectUpdateTimeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static processUiUpdateCallbacks(mapId: string) {
|
||||||
|
const updateQueue = ConnectionMonitor.uiUpdateMap.get(mapId);
|
||||||
|
let lastConnectCb: any;
|
||||||
|
let lastDisconnectCb: any;
|
||||||
|
let messageCount = 0;
|
||||||
|
|
||||||
|
updateQueue.forEach(update => {
|
||||||
|
ConnectionMonitor.logger.debug(`Processing update event '${update.event}' for ${mapId}`);
|
||||||
|
if (update.event === 'connect') {
|
||||||
|
messageCount += 1;
|
||||||
|
lastConnectCb = update.uiUpdateCallback;
|
||||||
|
}
|
||||||
|
else if (update.event === 'disconnect') {
|
||||||
|
messageCount -= 1;
|
||||||
|
lastDisconnectCb = update.uiUpdateCallback;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ConnectionMonitor.logger.warn('Unrecognized UI update event:', update.event)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (messageCount > 0) {
|
||||||
|
ConnectionMonitor.logger.debug(`Sending connect event for ${mapId}`);
|
||||||
|
lastConnectCb();
|
||||||
|
}
|
||||||
|
else if (messageCount < 0) {
|
||||||
|
ConnectionMonitor.logger.debug(`Sending disconnect event for ${mapId}`);
|
||||||
|
lastDisconnectCb();
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionMonitor.uiUpdateMap.set(mapId, []);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,6 @@ export class TcpListenerService {
|
||||||
emitter = new EventEmitter();
|
emitter = new EventEmitter();
|
||||||
|
|
||||||
private server: net.Server;
|
private server: net.Server;
|
||||||
private sessions: FCastSession[] = [];
|
|
||||||
private sessionMap = new Map();
|
private sessionMap = new Map();
|
||||||
|
|
||||||
start() {
|
start() {
|
||||||
|
@ -48,14 +47,14 @@ export class TcpListenerService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this.sessions.forEach(session => {
|
for (const session of this.sessionMap.values()) {
|
||||||
try {
|
try {
|
||||||
session.send(opcode, message);
|
session.send(opcode, message);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.warn("Failed to send error.", e);
|
logger.warn("Failed to send error.", e);
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,6 +63,12 @@ export class TcpListenerService {
|
||||||
this.sessionMap.delete(sessionId);
|
this.sessionMap.delete(sessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public getSenders(): string[] {
|
||||||
|
const senders = [];
|
||||||
|
this.sessionMap.forEach((sender) => { senders.push(sender.socket.remoteAddress); });
|
||||||
|
return senders;
|
||||||
|
}
|
||||||
|
|
||||||
public getSessions(): string[] {
|
public getSessions(): string[] {
|
||||||
return [...this.sessionMap.keys()];
|
return [...this.sessionMap.keys()];
|
||||||
}
|
}
|
||||||
|
@ -77,7 +82,6 @@ export class TcpListenerService {
|
||||||
|
|
||||||
const session = new FCastSession(socket, (data) => socket.write(data));
|
const session = new FCastSession(socket, (data) => socket.write(data));
|
||||||
session.bindEvents(this.emitter);
|
session.bindEvents(this.emitter);
|
||||||
this.sessions.push(session);
|
|
||||||
this.sessionMap.set(session.sessionId, session);
|
this.sessionMap.set(session.sessionId, session);
|
||||||
|
|
||||||
socket.on("error", (err) => {
|
socket.on("error", (err) => {
|
||||||
|
@ -95,11 +99,6 @@ export class TcpListenerService {
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.on("close", () => {
|
socket.on("close", () => {
|
||||||
const index = this.sessions.indexOf(session);
|
|
||||||
if (index != -1) {
|
|
||||||
this.sessions.splice(index, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.sessionMap.delete(session.sessionId);
|
this.sessionMap.delete(session.sessionId);
|
||||||
this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }});
|
this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'tcp', data: { address: socket.remoteAddress, port: socket.remotePort }});
|
||||||
});
|
});
|
||||||
|
|
|
@ -12,7 +12,6 @@ export class WebSocketListenerService {
|
||||||
emitter = new EventEmitter();
|
emitter = new EventEmitter();
|
||||||
|
|
||||||
private server: WebSocketServer;
|
private server: WebSocketServer;
|
||||||
private sessions: FCastSession[] = [];
|
|
||||||
private sessionMap = new Map();
|
private sessionMap = new Map();
|
||||||
|
|
||||||
start() {
|
start() {
|
||||||
|
@ -46,20 +45,19 @@ export class WebSocketListenerService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
this.sessions.forEach(session => {
|
for (const session of this.sessionMap.values()) {
|
||||||
try {
|
try {
|
||||||
session.send(opcode, message);
|
session.send(opcode, message);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.warn("Failed to send error.", e);
|
logger.warn("Failed to send error.", e);
|
||||||
session.close();
|
session.close();
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
disconnect(sessionId: string) {
|
disconnect(sessionId: string) {
|
||||||
this.sessionMap.get(sessionId)?.close();
|
this.sessionMap.get(sessionId)?.close();
|
||||||
this.sessionMap.delete(sessionId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public getSessions(): string[] {
|
public getSessions(): string[] {
|
||||||
|
@ -70,12 +68,11 @@ export class WebSocketListenerService {
|
||||||
errorHandler(err);
|
errorHandler(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleConnection(socket: WebSocket) {
|
private handleConnection(socket: WebSocket, request: any) {
|
||||||
logger.info('New WebSocket connection');
|
logger.info('New WebSocket connection');
|
||||||
|
|
||||||
const session = new FCastSession(socket, (data) => socket.send(data));
|
const session = new FCastSession(socket, (data) => socket.send(data));
|
||||||
session.bindEvents(this.emitter);
|
session.bindEvents(this.emitter);
|
||||||
this.sessions.push(session);
|
|
||||||
this.sessionMap.set(session.sessionId, session);
|
this.sessionMap.set(session.sessionId, session);
|
||||||
|
|
||||||
socket.on("error", (err) => {
|
socket.on("error", (err) => {
|
||||||
|
@ -97,13 +94,6 @@ export class WebSocketListenerService {
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.on("close", () => {
|
socket.on("close", () => {
|
||||||
logger.info('WebSocket connection closed');
|
|
||||||
|
|
||||||
const index = this.sessions.indexOf(session);
|
|
||||||
if (index != -1) {
|
|
||||||
this.sessions.splice(index, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.sessionMap.delete(session.sessionId);
|
this.sessionMap.delete(session.sessionId);
|
||||||
this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'ws', data: { url: socket.url }});
|
this.emitter.emit('disconnect', { sessionId: session.sessionId, type: 'ws', data: { url: socket.url }});
|
||||||
});
|
});
|
||||||
|
|
|
@ -55,7 +55,7 @@ if (TARGET === 'electron') {
|
||||||
window.targetAPI = {
|
window.targetAPI = {
|
||||||
onDeviceInfo: (callback: () => void) => preloadData.onDeviceInfoCb = callback,
|
onDeviceInfo: (callback: () => void) => preloadData.onDeviceInfoCb = callback,
|
||||||
getDeviceInfo: () => preloadData.deviceInfo,
|
getDeviceInfo: () => preloadData.deviceInfo,
|
||||||
getSessions: (callback: () => void) => preloadData.getSessionsCb = callback,
|
getSessions: (callback: () => any) => preloadData.getSessionsCb = callback,
|
||||||
onConnect: (callback: (_, value: any) => void) => preloadData.onConnectCb = callback,
|
onConnect: (callback: (_, value: any) => void) => preloadData.onConnectCb = callback,
|
||||||
onDisconnect: (callback: (_, value: any) => void) => preloadData.onDisconnectCb = callback,
|
onDisconnect: (callback: (_, value: any) => void) => preloadData.onDisconnectCb = callback,
|
||||||
logger: loggerInterface,
|
logger: loggerInterface,
|
||||||
|
|
|
@ -193,31 +193,22 @@ export class Main {
|
||||||
l.emitter.on("setspeed", (message) => Main.playerWindow?.webContents?.send("setspeed", message));
|
l.emitter.on("setspeed", (message) => Main.playerWindow?.webContents?.send("setspeed", message));
|
||||||
|
|
||||||
l.emitter.on('connect', (message) => {
|
l.emitter.on('connect', (message) => {
|
||||||
// Websocket clients currently don't have ping-pong commands supported
|
ConnectionMonitor.onConnect(l, message, l instanceof WebSocketListenerService, () => {
|
||||||
if (l instanceof TcpListenerService) {
|
|
||||||
ConnectionMonitor.onConnect(l, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
Main.mainWindow?.webContents?.send('connect', message);
|
Main.mainWindow?.webContents?.send('connect', message);
|
||||||
Main.playerWindow?.webContents?.send('connect', message);
|
Main.playerWindow?.webContents?.send('connect', message);
|
||||||
});
|
});
|
||||||
|
});
|
||||||
l.emitter.on('disconnect', (message) => {
|
l.emitter.on('disconnect', (message) => {
|
||||||
if (l instanceof TcpListenerService) {
|
ConnectionMonitor.onDisconnect(l, message, l instanceof WebSocketListenerService, () => {
|
||||||
ConnectionMonitor.onDisconnect(l, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
Main.mainWindow?.webContents?.send('disconnect', message);
|
Main.mainWindow?.webContents?.send('disconnect', message);
|
||||||
Main.playerWindow?.webContents?.send('disconnect', message);
|
Main.playerWindow?.webContents?.send('disconnect', message);
|
||||||
});
|
});
|
||||||
|
});
|
||||||
l.emitter.on('ping', (message) => {
|
l.emitter.on('ping', (message) => {
|
||||||
if (l instanceof TcpListenerService) {
|
ConnectionMonitor.onPingPong(message, l instanceof WebSocketListenerService);
|
||||||
ConnectionMonitor.onPingPong(message);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
l.emitter.on('pong', (message) => {
|
l.emitter.on('pong', (message) => {
|
||||||
if (l instanceof TcpListenerService) {
|
ConnectionMonitor.onPingPong(message, l instanceof WebSocketListenerService);
|
||||||
ConnectionMonitor.onPingPong(message);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
l.start();
|
l.start();
|
||||||
|
|
||||||
|
@ -287,8 +278,9 @@ export class Main {
|
||||||
window.setFullScreen(false);
|
window.setFullScreen(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Having to mix and match session ids and ip addresses until querying websocket remote addresses is fixed
|
||||||
ipcMain.handle('get-sessions', () => {
|
ipcMain.handle('get-sessions', () => {
|
||||||
return [].concat(Main.tcpListenerService.getSessions(), Main.webSocketListenerService.getSessions());
|
return [].concat(Main.tcpListenerService.getSenders(), Main.webSocketListenerService.getSessions());
|
||||||
});
|
});
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
@ -477,8 +469,8 @@ export async function errorHandler(error: Error) {
|
||||||
|
|
||||||
const restartPrompt = await dialog.showMessageBox({
|
const restartPrompt = await dialog.showMessageBox({
|
||||||
type: 'error',
|
type: 'error',
|
||||||
title: 'Failed to start',
|
title: 'Application Error',
|
||||||
message: `The application failed to start properly:\n\n${error.stack}}`,
|
message: `The application encountered an error:\n\n${error.stack}}`,
|
||||||
buttons: ['Restart', 'Close'],
|
buttons: ['Restart', 'Close'],
|
||||||
defaultId: 0,
|
defaultId: 0,
|
||||||
cancelId: 1
|
cancelId: 1
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue