1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-06-24 21:25:23 +00:00

Made ServerSocket loop more robust.

This commit is contained in:
Koen 2024-01-02 17:54:53 +01:00
parent a3b9be0cc4
commit 2a5f0f95e3

View file

@ -2,12 +2,12 @@ package com.futo.fcast.receiver
import android.util.Log import android.util.Log
import java.io.BufferedInputStream import java.io.BufferedInputStream
import java.net.InetSocketAddress
import java.net.ServerSocket import java.net.ServerSocket
import java.net.Socket import java.net.Socket
import java.util.ArrayList import java.util.ArrayList
class TcpListenerService(private val _networkService: NetworkService, private val _onNewSession: (session: FCastSession) -> Unit) { class TcpListenerService(private val _networkService: NetworkService, private val _onNewSession: (session: FCastSession) -> Unit) {
private var _serverSocket: ServerSocket? = null
private var _stopped: Boolean = false private var _stopped: Boolean = false
private var _listenThread: Thread? = null private var _listenThread: Thread? = null
private var _clientThreads: ArrayList<Thread> = arrayListOf() private var _clientThreads: ArrayList<Thread> = arrayListOf()
@ -36,9 +36,6 @@ class TcpListenerService(private val _networkService: NetworkService, private va
_stopped = true _stopped = true
_serverSocket?.close()
_serverSocket = null
_listenThread?.join() _listenThread?.join()
_listenThread = null _listenThread = null
@ -60,65 +57,86 @@ class TcpListenerService(private val _networkService: NetworkService, private va
private fun listenForIncomingConnections() { private fun listenForIncomingConnections() {
Log.i(TAG, "Started listening for incoming connections") Log.i(TAG, "Started listening for incoming connections")
_serverSocket = ServerSocket(PORT)
while (!_stopped) { while (!_stopped) {
val clientSocket = _serverSocket?.accept() ?: break try {
val serverSocket = ServerSocket()
val clientThread = Thread {
try { try {
handleClientConnection(clientSocket) serverSocket.bind(InetSocketAddress(PORT))
while (!_stopped) {
val clientSocket = serverSocket.accept() ?: break
val clientThread = Thread {
try {
Log.i(TAG, "New connection received from ${clientSocket.remoteSocketAddress}")
handleClientConnection(clientSocket)
} catch (e: Throwable) {
Log.e(TAG, "Failed handle client connection due to an error", e)
} finally {
try {
clientSocket.close()
synchronized(_clientThreads) {
_clientThreads.remove(Thread.currentThread())
}
Log.i(TAG, "Disconnected ${clientSocket.remoteSocketAddress}")
} catch (e: Throwable) {
Log.e(TAG, "Failed to close client socket", e)
}
}
}
synchronized(_clientThreads) {
_clientThreads.add(clientThread)
}
clientThread.start()
}
} catch (e: Throwable) { } catch (e: Throwable) {
Log.e(TAG, "Failed handle client connection due to an error", e) Log.e(TAG, "Failed to accept client connection due to an error, sleeping 1 second then restarting", e)
Thread.sleep(1000)
} finally {
serverSocket.close()
} }
} catch (e: Throwable) {
Log.e(TAG, "Failed to create server socket, sleeping 1 second then restarting", e)
Thread.sleep(1000)
} }
synchronized(_clientThreads) {
_clientThreads.add(clientThread)
}
clientThread.start()
} }
Log.i(TAG, "Stopped listening for incoming connections") Log.i(TAG, "Stopped listening for incoming connections")
} }
private fun handleClientConnection(socket: Socket) { private fun handleClientConnection(socket: Socket) {
Log.i(TAG, "New connection received from ${socket.remoteSocketAddress}")
val session = FCastSession(socket.getOutputStream(), socket.remoteSocketAddress, _networkService) val session = FCastSession(socket.getOutputStream(), socket.remoteSocketAddress, _networkService)
synchronized(_sessions) {
_sessions.add(session)
}
_onNewSession(session)
Log.i(TAG, "Waiting for data from ${socket.remoteSocketAddress}") try {
synchronized(_sessions) {
val bufferSize = 4096 _sessions.add(session)
val buffer = ByteArray(bufferSize)
val inputStream = BufferedInputStream(socket.getInputStream())
var bytesRead: Int
while (!_stopped) {
bytesRead = inputStream.read(buffer, 0, bufferSize)
if (bytesRead == -1) {
break
} }
_onNewSession(session)
session.processBytes(buffer, bytesRead) Log.i(TAG, "Waiting for data from ${socket.remoteSocketAddress}")
val bufferSize = 4096
val buffer = ByteArray(bufferSize)
val inputStream = BufferedInputStream(socket.getInputStream())
var bytesRead: Int
while (!_stopped) {
bytesRead = inputStream.read(buffer, 0, bufferSize)
if (bytesRead == -1) {
break
}
session.processBytes(buffer, bytesRead)
}
} finally {
synchronized(_sessions) {
_sessions.remove(session)
}
} }
socket.close()
synchronized(_sessions) {
_sessions.remove(session)
}
synchronized(_clientThreads) {
_clientThreads.remove(Thread.currentThread())
}
Log.i(TAG, "Disconnected ${socket.remoteSocketAddress}")
} }
companion object { companion object {