TaskQueue<T>
Represents a queue of scheduled tasks.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace DotNext.Threading.Tasks
{
[NullableContext(1)]
[Nullable(0)]
public class TaskQueue<[Nullable(0)] T> : IAsyncEnumerable<T> where T : Task
{
private sealed class Signal : TaskCompletionSource
{
public Signal()
: base(TaskCreationOptions.RunContinuationsAsynchronously)
{
}
}
private readonly T[] array;
private int tail;
private int head;
private int count;
private Signal signal;
[Nullable(2)]
private ref T this[int index] {
get {
return ref Unsafe.Add<T>(ref MemoryMarshal.GetArrayDataReference<T>(array), index);
}
}
[Nullable(2)]
public T HeadTask {
[NullableContext(2)]
get {
lock (array) {
return (count > 0) ? this[head] : null;
}
}
}
public bool CanEnqueue {
get {
lock (array) {
return count < array.Length;
}
}
}
public TaskQueue(int capacity)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero<int>(capacity, "capacity");
array = new T[capacity];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ChangeCount([ConstantExpected] bool increment)
{
count += (increment ? 1 : (-1));
if (signal?.TrySetResult() ?? false)
signal = null;
}
private void MoveNext(ref int index)
{
int num = index + 1;
index = ((num != array.Length) ? num : 0);
}
public ValueTask EnsureFreeSpaceAsync(CancellationToken token = default(CancellationToken))
{
Task task = default(Task);
lock (array) {
if (count < array.Length)
task = Task.CompletedTask;
else {
if (signal == null)
signal = new Signal();
task = signal.Task;
}
}
return new ValueTask(task.WaitAsync(token));
}
public bool TryEnqueue(T task)
{
ArgumentNullException.ThrowIfNull((object)task, "task");
lock (array) {
bool result;
if (!(result = (count < array.Length)))
return result;
this[tail] = task;
MoveNext(ref tail);
ChangeCount(true);
return result;
}
}
private bool TryEnqueue(T task, out Task waitTask)
{
lock (array) {
bool result;
if (!(result = (count < array.Length))) {
if (signal == null)
signal = new Signal();
waitTask = signal.Task;
return result;
}
this[tail] = task;
MoveNext(ref tail);
ChangeCount(true);
waitTask = Task.CompletedTask;
return result;
}
}
[AsyncStateMachine(typeof(TaskQueue<>.<EnqueueAsync>d__17))]
public ValueTask EnqueueAsync(T task, CancellationToken token = default(CancellationToken))
{
<EnqueueAsync>d__17 stateMachine = default(<EnqueueAsync>d__17);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
stateMachine.<>4__this = this;
stateMachine.task = task;
stateMachine.token = token;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start<<EnqueueAsync>d__17>(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private T TryPeekOrDequeue(out int head, out Task enqueueTask, out bool completed)
{
lock (array) {
T result;
if (count <= 0) {
head = 0;
result = null;
completed = false;
if (signal == null)
signal = new Signal();
enqueueTask = signal.Task;
return result;
}
result = this[head = this.head];
enqueueTask = Task.CompletedTask;
bool num = result?.IsCompleted ?? false;
bool flag = num;
completed = num;
if (!flag)
return result;
MoveNext(ref head);
ChangeCount(false);
return result;
}
}
private bool TryDequeue(int expectedHead, T task)
{
lock (array) {
ref T reference = ref this[expectedHead];
bool result;
if (!(result = (count > 0 && head == expectedHead && reference == task)))
return result;
MoveNext(ref head);
reference = null;
ChangeCount(false);
return result;
}
}
[NullableContext(2)]
public bool TryDequeue([NotNullWhen(true)] out T task)
{
lock (array) {
ref T reference = ref this[head];
task = reference;
if (count <= 0)
goto IL_006b;
T val = task;
if (val == null || !val.IsCompleted)
goto IL_006b;
MoveNext(ref head);
reference = null;
ChangeCount(false);
goto end_IL_0009;
IL_006b:
task = null;
end_IL_0009:;
}
return task != null;
}
[AsyncStateMachine(typeof(TaskQueue<>.<DequeueAsync>d__21))]
[return: Nullable(new byte[] {
0,
1
})]
public ValueTask<T> DequeueAsync(CancellationToken token = default(CancellationToken))
{
<DequeueAsync>d__21 stateMachine = default(<DequeueAsync>d__21);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<T>.Create();
stateMachine.<>4__this = this;
stateMachine.token = token;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncStateMachine(typeof(TaskQueue<>.<TryDequeueAsync>d__22))]
[return: Nullable(new byte[] {
0,
2
})]
public ValueTask<T> TryDequeueAsync(CancellationToken token = default(CancellationToken))
{
<TryDequeueAsync>d__22 stateMachine = default(<TryDequeueAsync>d__22);
stateMachine.<>t__builder = AsyncValueTaskMethodBuilder<T>.Create();
stateMachine.<>4__this = this;
stateMachine.token = token;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
[AsyncIteratorStateMachine(typeof(TaskQueue<>.<GetAsyncEnumerator>d__23))]
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token)
{
<GetAsyncEnumerator>d__23 <GetAsyncEnumerator>d__ = new <GetAsyncEnumerator>d__23(-3);
<GetAsyncEnumerator>d__.<>4__this = this;
<GetAsyncEnumerator>d__.token = token;
return <GetAsyncEnumerator>d__;
}
public void Clear()
{
lock (array) {
head = (tail = (count = 0));
Array.Clear((Array)array);
if (signal?.TrySetResult() ?? false)
signal = null;
}
}
void Reset()
{
Clear();
}
}
}