From ba8cd46e4cdb6585c230dd2666bcf8ea62d57e31 Mon Sep 17 00:00:00 2001 From: Koen Date: Thu, 22 Feb 2024 11:37:52 +0100 Subject: [PATCH] Added write/read semaphores. --- .../FCastClient.Cli/FCastClient.Cli.csproj | 2 +- .../FCastClient.Cli/Program.cs | 2 + .../FCastClient/FCastClient.csproj | 2 +- .../FCastClient/FCastSession.cs | 74 +++++++++++++------ 4 files changed, 56 insertions(+), 24 deletions(-) diff --git a/clients/terminal-dotnet/FCastClient.Cli/FCastClient.Cli.csproj b/clients/terminal-dotnet/FCastClient.Cli/FCastClient.Cli.csproj index eb806fe..79059af 100644 --- a/clients/terminal-dotnet/FCastClient.Cli/FCastClient.Cli.csproj +++ b/clients/terminal-dotnet/FCastClient.Cli/FCastClient.Cli.csproj @@ -10,7 +10,7 @@ Exe - net7.0 + net8.0 enable enable diff --git a/clients/terminal-dotnet/FCastClient.Cli/Program.cs b/clients/terminal-dotnet/FCastClient.Cli/Program.cs index 74a84fe..bed8183 100644 --- a/clients/terminal-dotnet/FCastClient.Cli/Program.cs +++ b/clients/terminal-dotnet/FCastClient.Cli/Program.cs @@ -121,6 +121,8 @@ internal class Program .Build(); CommandMatches matches = rootCommand.Parse(args); + Console.WriteLine(matches.ToString()); + var host = matches.Value("host")!; var connectionType = matches.Value("connection_type")!; diff --git a/clients/terminal-dotnet/FCastClient/FCastClient.csproj b/clients/terminal-dotnet/FCastClient/FCastClient.csproj index 4658cbf..1d2b313 100644 --- a/clients/terminal-dotnet/FCastClient/FCastClient.csproj +++ b/clients/terminal-dotnet/FCastClient/FCastClient.csproj @@ -1,7 +1,7 @@ - net7.0 + net8.0 enable enable diff --git a/clients/terminal-dotnet/FCastClient/FCastSession.cs b/clients/terminal-dotnet/FCastClient/FCastSession.cs index 28d9c78..f84a402 100644 --- a/clients/terminal-dotnet/FCastClient/FCastSession.cs +++ b/clients/terminal-dotnet/FCastClient/FCastSession.cs @@ -41,6 +41,8 @@ public class FCastSession : IDisposable private int _bytesRead; private int _packetLength; private Stream _stream; + private SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1); + private SemaphoreSlim _readerSemaphore = new SemaphoreSlim(1); private SessionState _state; public FCastSession(Stream stream) @@ -51,30 +53,48 @@ public class FCastSession : IDisposable public async Task SendMessageAsync(Opcode opcode, CancellationToken cancellationToken) { - int size = 1; - byte[] header = new byte[LengthBytes + 1]; - Array.Copy(BitConverter.GetBytes(size), header, LengthBytes); - header[LengthBytes] = (byte)opcode; + await _writerSemaphore.WaitAsync(); - Console.WriteLine($"Sent {header.Length} bytes with (opcode: {opcode}, header size: {header.Length}, no body)."); - await _stream.WriteAsync(header, cancellationToken); + try + { + int size = 1; + byte[] header = new byte[LengthBytes + 1]; + Array.Copy(BitConverter.GetBytes(size), header, LengthBytes); + header[LengthBytes] = (byte)opcode; + + Console.WriteLine($"Sent {header.Length} bytes with (opcode: {opcode}, header size: {header.Length}, no body)."); + await _stream.WriteAsync(header, cancellationToken); + } + finally + { + _writerSemaphore.Release(); + } } public async Task SendMessageAsync(Opcode opcode, T message, CancellationToken cancellationToken) where T : class { - string json = JsonSerializer.Serialize(message); - byte[] data = Encoding.UTF8.GetBytes(json); - int size = 1 + data.Length; - byte[] header = new byte[LengthBytes + 1]; - Array.Copy(BitConverter.GetBytes(size), header, LengthBytes); - header[LengthBytes] = (byte)opcode; + await _writerSemaphore.WaitAsync(); - byte[] packet = new byte[header.Length + data.Length]; - header.CopyTo(packet, 0); - data.CopyTo(packet, header.Length); + try + { + string json = JsonSerializer.Serialize(message); + byte[] data = Encoding.UTF8.GetBytes(json); + int size = 1 + data.Length; + byte[] header = new byte[LengthBytes + 1]; + Array.Copy(BitConverter.GetBytes(size), header, LengthBytes); + header[LengthBytes] = (byte)opcode; - Console.WriteLine($"Sent {packet.Length} bytes with (opcode: {opcode}, header size: {header.Length}, body size: {data.Length}, body: {json})."); - await _stream.WriteAsync(packet, cancellationToken); + byte[] packet = new byte[header.Length + data.Length]; + header.CopyTo(packet, 0); + data.CopyTo(packet, header.Length); + + Console.WriteLine($"Sent {packet.Length} bytes with (opcode: {opcode}, header size: {header.Length}, body size: {data.Length}, body: {json})."); + await _stream.WriteAsync(packet, cancellationToken); + } + finally + { + _writerSemaphore.Release(); + } } public async Task ReceiveLoopAsync(CancellationToken cancellationToken) @@ -85,12 +105,22 @@ public class FCastSession : IDisposable byte[] buffer = new byte[1024]; while (!cancellationToken.IsCancellationRequested) { - int bytesRead = await _stream.ReadAsync(buffer, cancellationToken); - if (bytesRead == 0) + await _readerSemaphore.WaitAsync(); + + int bytesRead; + try { - Console.WriteLine("Connection shutdown gracefully."); - Dispose(); - break; + bytesRead = await _stream.ReadAsync(buffer, cancellationToken); + if (bytesRead == 0) + { + Console.WriteLine("Connection shutdown gracefully."); + Dispose(); + break; + } + } + finally + { + _readerSemaphore.Release(); } await ProcessBytesAsync(buffer, bytesRead, cancellationToken);