1
0
Fork 0
mirror of https://gitlab.com/futo-org/fcast.git synced 2025-06-24 21:25:23 +00:00

Added write/read semaphores.

This commit is contained in:
Koen 2024-02-22 11:37:52 +01:00
parent 47ea3a3c42
commit ba8cd46e4c
4 changed files with 56 additions and 24 deletions

View file

@ -10,7 +10,7 @@
<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework> <TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings> <ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>

View file

@ -121,6 +121,8 @@ internal class Program
.Build(); .Build();
CommandMatches matches = rootCommand.Parse(args); CommandMatches matches = rootCommand.Parse(args);
Console.WriteLine(matches.ToString());
var host = matches.Value("host")!; var host = matches.Value("host")!;
var connectionType = matches.Value("connection_type")!; var connectionType = matches.Value("connection_type")!;

View file

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFramework>net7.0</TargetFramework> <TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings> <ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>

View file

@ -41,6 +41,8 @@ public class FCastSession : IDisposable
private int _bytesRead; private int _bytesRead;
private int _packetLength; private int _packetLength;
private Stream _stream; private Stream _stream;
private SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1);
private SemaphoreSlim _readerSemaphore = new SemaphoreSlim(1);
private SessionState _state; private SessionState _state;
public FCastSession(Stream stream) public FCastSession(Stream stream)
@ -51,30 +53,48 @@ public class FCastSession : IDisposable
public async Task SendMessageAsync(Opcode opcode, CancellationToken cancellationToken) public async Task SendMessageAsync(Opcode opcode, CancellationToken cancellationToken)
{ {
int size = 1; await _writerSemaphore.WaitAsync();
byte[] header = new byte[LengthBytes + 1];
Array.Copy(BitConverter.GetBytes(size), header, LengthBytes);
header[LengthBytes] = (byte)opcode;
Console.WriteLine($"Sent {header.Length} bytes with (opcode: {opcode}, header size: {header.Length}, no body)."); try
await _stream.WriteAsync(header, cancellationToken); {
int size = 1;
byte[] header = new byte[LengthBytes + 1];
Array.Copy(BitConverter.GetBytes(size), header, LengthBytes);
header[LengthBytes] = (byte)opcode;
Console.WriteLine($"Sent {header.Length} bytes with (opcode: {opcode}, header size: {header.Length}, no body).");
await _stream.WriteAsync(header, cancellationToken);
}
finally
{
_writerSemaphore.Release();
}
} }
public async Task SendMessageAsync<T>(Opcode opcode, T message, CancellationToken cancellationToken) where T : class public async Task SendMessageAsync<T>(Opcode opcode, T message, CancellationToken cancellationToken) where T : class
{ {
string json = JsonSerializer.Serialize(message); await _writerSemaphore.WaitAsync();
byte[] data = Encoding.UTF8.GetBytes(json);
int size = 1 + data.Length;
byte[] header = new byte[LengthBytes + 1];
Array.Copy(BitConverter.GetBytes(size), header, LengthBytes);
header[LengthBytes] = (byte)opcode;
byte[] packet = new byte[header.Length + data.Length]; try
header.CopyTo(packet, 0); {
data.CopyTo(packet, header.Length); string json = JsonSerializer.Serialize(message);
byte[] data = Encoding.UTF8.GetBytes(json);
int size = 1 + data.Length;
byte[] header = new byte[LengthBytes + 1];
Array.Copy(BitConverter.GetBytes(size), header, LengthBytes);
header[LengthBytes] = (byte)opcode;
Console.WriteLine($"Sent {packet.Length} bytes with (opcode: {opcode}, header size: {header.Length}, body size: {data.Length}, body: {json})."); byte[] packet = new byte[header.Length + data.Length];
await _stream.WriteAsync(packet, cancellationToken); header.CopyTo(packet, 0);
data.CopyTo(packet, header.Length);
Console.WriteLine($"Sent {packet.Length} bytes with (opcode: {opcode}, header size: {header.Length}, body size: {data.Length}, body: {json}).");
await _stream.WriteAsync(packet, cancellationToken);
}
finally
{
_writerSemaphore.Release();
}
} }
public async Task ReceiveLoopAsync(CancellationToken cancellationToken) public async Task ReceiveLoopAsync(CancellationToken cancellationToken)
@ -85,12 +105,22 @@ public class FCastSession : IDisposable
byte[] buffer = new byte[1024]; byte[] buffer = new byte[1024];
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
int bytesRead = await _stream.ReadAsync(buffer, cancellationToken); await _readerSemaphore.WaitAsync();
if (bytesRead == 0)
int bytesRead;
try
{ {
Console.WriteLine("Connection shutdown gracefully."); bytesRead = await _stream.ReadAsync(buffer, cancellationToken);
Dispose(); if (bytesRead == 0)
break; {
Console.WriteLine("Connection shutdown gracefully.");
Dispose();
break;
}
}
finally
{
_readerSemaphore.Release();
} }
await ProcessBytesAsync(buffer, bytesRead, cancellationToken); await ProcessBytesAsync(buffer, bytesRead, cancellationToken);