DotNext.Threading by .NET Foundation and Contributors

<PackageReference Include="DotNext.Threading" Version="5.12.0" />

 TaskQueue<T>

public class TaskQueue<T> : IAsyncEnumerable<T>, IResettable where T : Task
Represents a queue of scheduled tasks.
using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace DotNext.Threading.Tasks { [NullableContext(1)] [Nullable(0)] public class TaskQueue<[Nullable(0)] T> : IAsyncEnumerable<T> where T : Task { private sealed class Signal : TaskCompletionSource { public Signal() : base(TaskCreationOptions.RunContinuationsAsynchronously) { } } private readonly T[] array; private int tail; private int head; private int count; private Signal signal; [Nullable(2)] private ref T this[int index] { get { return ref Unsafe.Add<T>(ref MemoryMarshal.GetArrayDataReference<T>(array), index); } } [Nullable(2)] public T HeadTask { [NullableContext(2)] get { lock (array) { return (count > 0) ? this[head] : null; } } } public bool CanEnqueue { get { lock (array) { return count < array.Length; } } } public TaskQueue(int capacity) { ArgumentOutOfRangeException.ThrowIfNegativeOrZero<int>(capacity, "capacity"); array = new T[capacity]; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ChangeCount([ConstantExpected] bool increment) { count += (increment ? 1 : (-1)); if (signal?.TrySetResult() ?? false) signal = null; } private void MoveNext(ref int index) { int num = index + 1; index = ((num != array.Length) ? num : 0); } public ValueTask EnsureFreeSpaceAsync(CancellationToken token = default(CancellationToken)) { Task task = default(Task); lock (array) { if (count < array.Length) task = Task.CompletedTask; else { if (signal == null) signal = new Signal(); task = signal.Task; } } return new ValueTask(task.WaitAsync(token)); } public bool TryEnqueue(T task) { ArgumentNullException.ThrowIfNull((object)task, "task"); lock (array) { bool result; if (!(result = (count < array.Length))) return result; this[tail] = task; MoveNext(ref tail); ChangeCount(true); return result; } } private bool TryEnqueue(T task, out Task waitTask) { lock (array) { bool result; if (!(result = (count < array.Length))) { if (signal == null) signal = new Signal(); waitTask = signal.Task; return result; } this[tail] = task; MoveNext(ref tail); ChangeCount(true); waitTask = Task.CompletedTask; return result; } } [AsyncStateMachine(typeof(TaskQueue<>.<EnqueueAsync>d__17))] public ValueTask EnqueueAsync(T task, CancellationToken token = default(CancellationToken)) { <EnqueueAsync>d__17 stateMachine = default(<EnqueueAsync>d__17); stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create(); stateMachine.<>4__this = this; stateMachine.task = task; stateMachine.token = token; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start<<EnqueueAsync>d__17>(ref stateMachine); return stateMachine.<>t__builder.Task; } private T TryPeekOrDequeue(out int head, out Task enqueueTask, out bool completed) { lock (array) { T result; if (count <= 0) { head = 0; result = null; completed = false; if (signal == null) signal = new Signal(); enqueueTask = signal.Task; return result; } result = this[head = this.head]; enqueueTask = Task.CompletedTask; bool num = result?.IsCompleted ?? false; bool flag = num; completed = num; if (!flag) return result; MoveNext(ref head); ChangeCount(false); return result; } } private bool TryDequeue(int expectedHead, T task) { lock (array) { ref T reference = ref this[expectedHead]; bool result; if (!(result = (count > 0 && head == expectedHead && reference == task))) return result; MoveNext(ref head); reference = null; ChangeCount(false); return result; } } [NullableContext(2)] public bool TryDequeue([NotNullWhen(true)] out T task) { lock (array) { ref T reference = ref this[head]; task = reference; if (count <= 0) goto IL_006b; T val = task; if (val == null || !val.IsCompleted) goto IL_006b; MoveNext(ref head); reference = null; ChangeCount(false); goto end_IL_0009; IL_006b: task = null; end_IL_0009:; } return task != null; } [AsyncStateMachine(typeof(TaskQueue<>.<DequeueAsync>d__21))] [return: Nullable(new byte[] { 0, 1 })] public ValueTask<T> DequeueAsync(CancellationToken token = default(CancellationToken)) { <DequeueAsync>d__21 stateMachine = default(<DequeueAsync>d__21); stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<T>.Create(); stateMachine.<>4__this = this; stateMachine.token = token; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncStateMachine(typeof(TaskQueue<>.<TryDequeueAsync>d__22))] [return: Nullable(new byte[] { 0, 2 })] public ValueTask<T> TryDequeueAsync(CancellationToken token = default(CancellationToken)) { <TryDequeueAsync>d__22 stateMachine = default(<TryDequeueAsync>d__22); stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<T>.Create(); stateMachine.<>4__this = this; stateMachine.token = token; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [AsyncIteratorStateMachine(typeof(TaskQueue<>.<GetAsyncEnumerator>d__23))] public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token) { <GetAsyncEnumerator>d__23 <GetAsyncEnumerator>d__ = new <GetAsyncEnumerator>d__23(-3); <GetAsyncEnumerator>d__.<>4__this = this; <GetAsyncEnumerator>d__.token = token; return <GetAsyncEnumerator>d__; } public void Clear() { lock (array) { head = (tail = (count = 0)); Array.Clear((Array)array); if (signal?.TrySetResult() ?? false) signal = null; } } void Reset() { Clear(); } } }