AsyncWriterStream<TOutput>
sealed class AsyncWriterStream<TOutput> : WriterStream<TOutput> where TOutput : ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFlushable
using DotNext.Buffers;
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace DotNext.IO
{
internal sealed class AsyncWriterStream<TOutput> : WriterStream<TOutput> where TOutput : ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFlushable
{
private const int DefaultTimeout = 4000;
private int timeout = 4000;
private CancellationTokenSource timeoutSource;
public override int WriteTimeout {
get {
return timeout;
}
set {
if (value <= 0)
throw new ArgumentOutOfRangeException("value");
timeout = value;
}
}
public override bool CanTimeout => true;
public AsyncWriterStream(TOutput output)
: base(output)
{
}
[AsyncStateMachine(typeof(AsyncWriterStream<>.<WriteAsync>d__9))]
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken token)
{
<WriteAsync>d__9 stateMachine = default(<WriteAsync>d__9);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.buffer = buffer;
stateMachine.token = token;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start<<WriteAsync>d__9>(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
public override void Write(ReadOnlySpan<byte> buffer)
{
if (!buffer.IsEmpty) {
using (MemoryOwner<byte> memoryOwner = Span.Copy<byte>(buffer, (MemoryAllocator<byte>)null)) {
if (timeoutSource == null)
timeoutSource = new CancellationTokenSource();
timeoutSource.CancelAfter(timeout);
Task task = WriteAsync(memoryOwner.Memory, timeoutSource.Token).AsTask();
try {
task.Wait();
} finally {
task.Dispose();
if (!timeoutSource.TryReset()) {
timeoutSource.Dispose();
timeoutSource = null;
}
}
}
}
}
protected override void Dispose(bool disposing)
{
if (disposing)
timeoutSource?.Dispose();
base.Dispose(disposing);
}
}
}