StreamSource
Represents Stream factory methods.
using DotNext.Buffers;
using DotNext.Runtime.CompilerServices;
using System;
using System.Buffers;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace DotNext.IO
{
[NullableContext(1)]
[Nullable(0)]
public static class StreamSource
{
[StructLayout(LayoutKind.Auto)]
private readonly struct ReadOnlySpanWriter<TArg> : IReadOnlySpanConsumer<byte>, ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFunctional<Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask>>, IFlushable
{
[CompilerGenerated]
private readonly ReadOnlySpanAction<byte, TArg> <output>P;
[CompilerGenerated]
private readonly TArg <arg>P;
[CompilerGenerated]
private readonly Action<TArg> <flush>P;
[CompilerGenerated]
private readonly Func<TArg, CancellationToken, Task> <flushAsync>P;
public ReadOnlySpanWriter(ReadOnlySpanAction<byte, TArg> output, TArg arg, Action<TArg> flush, Func<TArg, CancellationToken, Task> flushAsync)
{
<output>P = output;
<arg>P = arg;
<flush>P = flush;
<flushAsync>P = flushAsync;
}
void IFlushable.Flush()
{
StreamSource.Flush<TArg>(<flush>P, <flushAsync>P, <arg>P);
}
Task IFlushable.FlushAsync(CancellationToken token)
{
return StreamSource.FlushAsync<TArg>(<flush>P, <flushAsync>P, <arg>P, token);
}
void IReadOnlySpanConsumer<byte>.Invoke(ReadOnlySpan<byte> input)
{
<output>P(input, <arg>P);
}
}
[StructLayout(LayoutKind.Auto)]
private readonly struct ReadOnlyMemoryWriter<TArg> : ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFunctional<Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask>>, IFlushable
{
[CompilerGenerated]
private readonly Func<ReadOnlyMemory<byte>, TArg, CancellationToken, ValueTask> <output>P;
[CompilerGenerated]
private readonly TArg <arg>P;
[CompilerGenerated]
private readonly Action<TArg> <flush>P;
[CompilerGenerated]
private readonly Func<TArg, CancellationToken, Task> <flushAsync>P;
public ReadOnlyMemoryWriter(Func<ReadOnlyMemory<byte>, TArg, CancellationToken, ValueTask> output, TArg arg, Action<TArg> flush, Func<TArg, CancellationToken, Task> flushAsync)
{
<output>P = output;
<arg>P = arg;
<flush>P = flush;
<flushAsync>P = flushAsync;
}
void IFlushable.Flush()
{
StreamSource.Flush<TArg>(<flush>P, <flushAsync>P, <arg>P);
}
Task IFlushable.FlushAsync(CancellationToken token)
{
return StreamSource.FlushAsync<TArg>(<flush>P, <flushAsync>P, <arg>P, token);
}
ValueTask ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>.Invoke(ReadOnlyMemory<byte> input, CancellationToken token)
{
return <output>P(input, <arg>P, token);
}
}
[StructLayout(LayoutKind.Auto)]
private readonly struct BufferWriter<TBuffer> : IReadOnlySpanConsumer<byte>, ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFunctional<Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask>>, IFlushable where TBuffer : class, IBufferWriter<byte>
{
[CompilerGenerated]
private readonly TBuffer <output>P;
[CompilerGenerated]
private readonly Action<TBuffer> <flush>P;
[CompilerGenerated]
private readonly Func<TBuffer, CancellationToken, Task> <flushAsync>P;
public BufferWriter(TBuffer output, Action<TBuffer> flush, Func<TBuffer, CancellationToken, Task> flushAsync)
{
<output>P = output;
<flush>P = flush;
<flushAsync>P = flushAsync;
}
void IFlushable.Flush()
{
StreamSource.Flush<TBuffer>(<flush>P, <flushAsync>P, <output>P);
}
Task IFlushable.FlushAsync(CancellationToken token)
{
return StreamSource.FlushAsync<TBuffer>(<flush>P, <flushAsync>P, <output>P, token);
}
void IReadOnlySpanConsumer<byte>.Invoke(ReadOnlySpan<byte> input)
{
BuffersExtensions.Write<byte>((IBufferWriter<byte>)<output>P, input);
}
}
[StructLayout(LayoutKind.Auto)]
private readonly struct DelegatingWriter<TBuffer> : IReadOnlySpanConsumer<byte>, ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFunctional<Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask>>, IFlushable where TBuffer : class, IBufferWriter<byte>
{
[CompilerGenerated]
private readonly TBuffer <output>P;
[CompilerGenerated]
private readonly Action<TBuffer> <flush>P;
[CompilerGenerated]
private readonly Func<TBuffer, CancellationToken, Task> <flushAsync>P;
public DelegatingWriter(TBuffer output, Action<TBuffer> flush, Func<TBuffer, CancellationToken, Task> flushAsync)
{
<output>P = output;
<flush>P = flush;
<flushAsync>P = flushAsync;
}
void IFlushable.Flush()
{
StreamSource.Flush<TBuffer>(<flush>P, <flushAsync>P, <output>P);
}
Task IFlushable.FlushAsync(CancellationToken token)
{
return StreamSource.FlushAsync<TBuffer>(<flush>P, <flushAsync>P, <output>P, token);
}
void IReadOnlySpanConsumer<byte>.Invoke(ReadOnlySpan<byte> input)
{
Unsafe.As<IReadOnlySpanConsumer<byte>>((object)<output>P).Invoke(input);
}
}
[NullableContext(0)]
[return: Nullable(1)]
public static Stream AsStream(this ArraySegment<byte> segment, bool writable = false)
{
if (!segment.Array.IsNullOrEmpty())
return new MemoryStream(segment.Array, segment.Offset, segment.Count, writable, false);
return Stream.Null;
}
[NullableContext(0)]
[return: Nullable(1)]
public static Stream AsStream(this ReadOnlySequence<byte> sequence)
{
if (sequence.IsEmpty)
return Stream.Null;
if (SequenceMarshal.TryGetArray(sequence, out ArraySegment<byte> segment))
return segment.AsStream(false);
return new ReadOnlyMemoryStream(sequence);
}
[NullableContext(0)]
[return: Nullable(1)]
public static Stream AsStream(this ReadOnlyMemory<byte> memory)
{
return new ReadOnlySequence<byte>(memory).AsStream();
}
[NullableContext(0)]
[return: Nullable(1)]
public static Stream (this ReadOnlySequence<byte> sequence, bool compatWithAsync = true)
{
if (!sequence.IsEmpty) {
if (!compatWithAsync)
return SharedReadOnlyMemoryStream.CreateThreadLocalStream(sequence);
return SharedReadOnlyMemoryStream.CreateAsyncLocalStream(sequence);
}
return Stream.Null;
}
public static Stream AsSynchronousStream<TOutput>(TOutput output) where TOutput : IFlushable, IReadOnlySpanConsumer<byte>
{
return new SyncWriterStream<TOutput>(output);
}
public static Stream AsStream<[Nullable(2)] TArg>(this ReadOnlySpanAction<byte, TArg> writer, TArg arg, [Nullable(new byte[] {
2,
1
})] Action<TArg> flush = null, [Nullable(new byte[] {
2,
1,
1
})] Func<TArg, CancellationToken, Task> flushAsync = null)
{
if (writer == null)
throw new ArgumentNullException("writer");
return AsSynchronousStream(new ReadOnlySpanWriter<TArg>(writer, arg, flush, flushAsync));
}
public static Stream AsStream<TWriter>(this TWriter writer, [Nullable(new byte[] {
2,
1
})] Action<TWriter> flush = null, [Nullable(new byte[] {
2,
1,
1
})] Func<TWriter, CancellationToken, Task> flushAsync = null) where TWriter : class, IBufferWriter<byte>
{
((IFlushable)).DiscoverFlushMethods(writer, ref flush, ref flushAsync);
if (!(writer is IReadOnlySpanConsumer<byte>))
return AsSynchronousStream(new BufferWriter<TWriter>(writer, flush, flushAsync));
return AsSynchronousStream(new DelegatingWriter<TWriter>(writer, flush, flushAsync));
}
public static Stream AsStream(this SparseBufferWriter<byte> writer, bool readable)
{
if (!readable)
return writer.AsStream(null, null);
SparseBufferWriter<byte>.MemoryChunk firstChunk = writer.FirstChunk;
if (firstChunk == null)
return Stream.Null;
if (firstChunk.Next == null)
return firstChunk.WrittenMemory.AsStream();
return new SparseMemoryStream(writer);
}
public static Stream AsAsynchronousStream<TOutput>(TOutput output) where TOutput : ISupplier<ReadOnlyMemory<byte>, CancellationToken, ValueTask>, IFlushable
{
return new AsyncWriterStream<TOutput>(output);
}
public static Stream AsStream<[Nullable(2)] TArg>([Nullable(new byte[] {
1,
0,
1
})] this Func<ReadOnlyMemory<byte>, TArg, CancellationToken, ValueTask> writer, TArg arg, [Nullable(new byte[] {
2,
1
})] Action<TArg> flush = null, [Nullable(new byte[] {
2,
1,
1
})] Func<TArg, CancellationToken, Task> flushAsync = null)
{
ArgumentNullException.ThrowIfNull((object)writer, "writer");
return AsAsynchronousStream(new ReadOnlyMemoryWriter<TArg>(writer, arg, flush, flushAsync));
}
public static Stream AsStream<[Nullable(2)] TArg>([Nullable(new byte[] {
1,
0,
1,
0
})] this Func<Memory<byte>, TArg, CancellationToken, ValueTask<int>> reader, TArg arg)
{
ArgumentNullException.ThrowIfNull((object)reader, "reader");
return new ReadOnlyStream<TArg>(reader, arg);
}
private static void Flush<TArg>(Action<TArg> flush, Func<TArg, CancellationToken, Task> flushAsync, TArg arg)
{
if (flush == null)
flushAsync?.Invoke(arg, CancellationToken.None).ConfigureAwait(false).GetAwaiter()
.GetResult();
else
flush(arg);
}
private static Task FlushAsync<TArg>(Action<TArg> flush, Func<TArg, CancellationToken, Task> flushAsync, TArg arg, CancellationToken token)
{
if (flushAsync == null) {
if (flush != null)
return Task.Factory.StartNew(<FlushAsync>g__CreateTaskCallback|16_0(flush, arg), token, TaskCreationOptions.None, TaskScheduler.Current);
return Task.CompletedTask;
}
return flushAsync(arg, token);
}
}
}