ManualResetCompletionSource
Represents base class for producer of value task.
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
namespace DotNext.Threading.Tasks
{
[NullableContext(2)]
[Nullable(0)]
public abstract class ManualResetCompletionSource
{
[StructLayout(LayoutKind.Auto)]
private readonly struct Continuation : IThreadPoolWorkItem
{
private readonly Action<object> action;
private readonly object state;
private readonly object schedulingContext;
private readonly ExecutionContext context;
public bool IsValid => action != null;
public Continuation(Action<object> action, object state, ValueTaskSourceOnCompletedFlags flags)
{
this.action = action;
this.state = state;
schedulingContext = (((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) ? <.ctor>g__CaptureSchedulingContext|4_0() : null);
context = (((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) ? ExecutionContext.Capture() : null);
}
public void InvokeOnCurrentContext(bool runAsynchronously)
{
if (schedulingContext != null)
Invoke();
else if (!runAsynchronously) {
action(state);
} else if (context != null) {
ThreadPool.QueueUserWorkItem(action, state, true);
} else {
ThreadPool.UnsafeQueueUserWorkItem(action, state, true);
}
}
public void InvokeOnCapturedContext(bool runAsynchronously)
{
if (schedulingContext != null)
InvokeOnSchedulingContext();
else {
bool flag = context != null;
if (runAsynchronously) {
if (flag)
ThreadPool.UnsafeQueueUserWorkItem(this, true);
else
ThreadPool.UnsafeQueueUserWorkItem(action, state, true);
} else if (flag) {
ExecutionContext.Run(context, Unsafe.As<ContextCallback>((object)action), state);
} else {
action(state);
}
}
}
private void InvokeOnSchedulingContext()
{
ExecutionContext executionContext = context;
if (executionContext != null) {
ExecutionContext executionContext2 = ExecutionContext.Capture();
ExecutionContext.Restore(executionContext);
try {
Invoke();
} finally {
if (executionContext2 != null)
ExecutionContext.Restore(executionContext2);
}
} else
Invoke();
}
private void Invoke()
{
object obj = schedulingContext;
SynchronizationContext synchronizationContext = obj as SynchronizationContext;
if (synchronizationContext == null) {
TaskScheduler taskScheduler = obj as TaskScheduler;
if (taskScheduler != null)
Task.Factory.StartNew(action, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, taskScheduler);
} else
synchronizationContext.Post(action.Invoke, state);
}
void IThreadPoolWorkItem.Execute()
{
ExecutionContext.Restore(context);
action(state);
}
}
[StructLayout(LayoutKind.Auto)]
private protected struct VersionAndStatus
{
private ulong value;
public short Version => GetVersion(ref value);
public ManualResetCompletionSourceStatus Status {
get {
return GetStatus(ref value);
}
set {
GetStatus(ref this.value) = value;
}
}
public bool IsCompleted => Status >= ManualResetCompletionSourceStatus.WaitForConsumption;
public VersionAndStatus()
{
this = new VersionAndStatus(short.MinValue, ManualResetCompletionSourceStatus.WaitForActivation);
}
private VersionAndStatus(short version, ManualResetCompletionSourceStatus status)
{
value = Combine(version, status);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public VersionAndStatus VolatileRead()
{
VersionAndStatus result = new VersionAndStatus();
result.value = Volatile.Read(ref value);
return result;
}
public bool CanBeCompleted(short? token)
{
short version = Version;
ManualResetCompletionSourceStatus status = Status;
if (((uint)status <= 1) ? true : false)
return token.GetValueOrDefault(version) == version;
return false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Consume(short version)
{
ulong num = Interlocked.CompareExchange(ref value, Combine(version, ManualResetCompletionSourceStatus.Consumed), Combine(version, ManualResetCompletionSourceStatus.WaitForConsumption));
string message;
if (GetStatus(ref num) != ManualResetCompletionSourceStatus.WaitForConsumption)
message = ExceptionMessages.InvalidSourceState;
else {
if (GetVersion(ref num) == version)
return;
message = ExceptionMessages.InvalidSourceToken;
}
throw new InvalidOperationException(message);
}
public short Reset()
{
short version = GetVersion(ref value);
value = Combine(version = (short)(version + 1), ManualResetCompletionSourceStatus.WaitForActivation);
return version;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ref short GetVersion(ref ulong value)
{
return ref Unsafe.As<ulong, short>(ref value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ref ManualResetCompletionSourceStatus GetStatus(ref ulong value)
{
return ref Unsafe.As<int, ManualResetCompletionSourceStatus>(ref Unsafe.Add<int>(ref Unsafe.As<ulong, int>(ref value), 1));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static ulong Combine(short version, ManualResetCompletionSourceStatus status)
{
ulong result = default(ulong);
Unsafe.SkipInit<ulong>(ref result);
GetVersion(ref result) = version;
GetStatus(ref result) = status;
return result;
}
}
[StructLayout(LayoutKind.Auto)]
private struct CancellationState : IDisposable
{
private CancellationTokenRegistration tokenTracker;
private CancellationTokenSource timeoutSource;
internal void Initialize(ref VersionAndStatus vs, Action<object, CancellationToken> callback, TimeSpan timeout, CancellationToken token)
{
IEquatable<short> equatable = null;
if (token.CanBeCanceled)
tokenTracker = token.UnsafeRegister(callback, (object)(equatable = vs.Version));
vs.Status = ManualResetCompletionSourceStatus.Activated;
if (timeout > default(TimeSpan) && !token.IsCancellationRequested) {
if (timeoutSource == null)
timeoutSource = new CancellationTokenSource();
timeoutSource.Token.UnsafeRegister(callback, equatable ?? ((object)vs.Version));
timeoutSource.CancelAfter(timeout);
}
}
[IsReadOnly]
internal bool IsTimeoutToken(CancellationToken token)
{
CancellationTokenSource cancellationTokenSource = timeoutSource;
if (cancellationTokenSource == null)
return false;
return cancellationTokenSource.Token == token;
}
internal CancellationState Detach()
{
CancellationState cancellationState = default(CancellationState);
cancellationState.tokenTracker = tokenTracker;
CancellationState result = cancellationState;
CancellationTokenSource cancellationTokenSource = timeoutSource;
if (cancellationTokenSource != null && !cancellationTokenSource.TryReset()) {
result.timeoutSource = cancellationTokenSource;
timeoutSource = null;
}
tokenTracker = default(CancellationTokenRegistration);
return result;
}
[IsReadOnly]
public void Dispose()
{
tokenTracker.Unregister();
timeoutSource?.Dispose();
}
}
protected const short InitialCompletionToken = short.MinValue;
private readonly Action<object, CancellationToken> cancellationCallback;
private readonly bool runContinuationsAsynchronously;
private CancellationState state;
private Continuation continuation;
private protected VersionAndStatus versionAndStatus;
private int lockState;
protected object CompletionData { get; set; }
public ManualResetCompletionSourceStatus Status => versionAndStatus.VolatileRead().Status;
public bool IsCompleted => versionAndStatus.VolatileRead().IsCompleted;
private protected ManualResetCompletionSource(bool runContinuationsAsynchronously)
{
this.runContinuationsAsynchronously = runContinuationsAsynchronously;
versionAndStatus = new VersionAndStatus();
cancellationCallback = CancellationRequested;
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void CancellationRequested(object expectedVersion, CancellationToken token)
{
if (versionAndStatus.Status == ManualResetCompletionSourceStatus.Activated) {
EnterLock();
try {
if (versionAndStatus.Status != ManualResetCompletionSourceStatus.Activated || versionAndStatus.Version != (short)expectedVersion || !(state.IsTimeoutToken(token) ? CompleteAsTimedOut() : CompleteAsCanceled(token)))
return;
} finally {
ExitLock();
}
Resume();
}
}
private protected abstract bool CompleteAsTimedOut();
private protected abstract bool CompleteAsCanceled(CancellationToken token);
protected virtual void CleanUp()
{
}
private CancellationState ResetCore(out short token)
{
token = versionAndStatus.Reset();
CompletionData = null;
return state.Detach();
}
public short Reset()
{
EnterLock();
short token;
CancellationState cancellationState = ResetCore(out token);
ExitLock();
cancellationState.Dispose();
CleanUp();
return token;
}
public bool TryReset(out short token)
{
bool num = TryEnterLock();
if (num) {
CancellationState cancellationState = ResetCore(out token);
ExitLock();
cancellationState.Dispose();
CleanUp();
return num;
}
token = 0;
return num;
}
protected virtual void AfterConsumed()
{
}
internal void Resume()
{
state.Detach().Dispose();
Continuation continuation = this.continuation;
if (continuation.IsValid) {
this.continuation = default(Continuation);
continuation.InvokeOnCapturedContext(runContinuationsAsynchronously);
}
}
private protected bool SetResult(object completionData)
{
CompletionData = completionData;
versionAndStatus.Status = ManualResetCompletionSourceStatus.WaitForConsumption;
return continuation.IsValid;
}
private void OnCompleted([In] [IsReadOnly] ref Continuation continuation, short token)
{
EnterLock();
string message;
if (token != versionAndStatus.Version) {
message = ExceptionMessages.InvalidSourceToken;
ExitLock();
} else {
switch (versionAndStatus.Status) {
case ManualResetCompletionSourceStatus.WaitForConsumption:
ExitLock();
continuation.InvokeOnCurrentContext(runContinuationsAsynchronously);
return;
case ManualResetCompletionSourceStatus.Activated:
this.continuation = continuation;
ExitLock();
return;
}
message = ExceptionMessages.InvalidSourceState;
ExitLock();
}
throw new InvalidOperationException(message);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private protected void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
Continuation continuation2 = new Continuation(continuation, state, flags);
OnCompleted(ref continuation2, token);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[NullableContext(1)]
public bool TrySetException(Exception e)
{
return TrySetException(null, e);
}
[NullableContext(1)]
public abstract bool TrySetException([Nullable(2)] object completionData, Exception e);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TrySetCanceled(CancellationToken token)
{
return TrySetCanceled(null, token);
}
public abstract bool TrySetCanceled(object completionData, CancellationToken token);
private protected short? Activate(TimeSpan timeout, CancellationToken token)
{
long ticks = timeout.Ticks;
if (ticks < 0) {
if (ticks != -10000)
goto IL_0024;
} else if (ticks > 21474836470000) {
goto IL_0024;
}
bool flag = false;
goto IL_002a;
IL_0024:
flag = true;
goto IL_002a;
IL_002a:
if (!flag) {
EnterLock();
try {
switch (versionAndStatus.Status) {
case ManualResetCompletionSourceStatus.WaitForActivation:
if (timeout == default(TimeSpan))
CompleteAsTimedOut();
else {
state.Initialize(ref versionAndStatus, cancellationCallback, timeout, token);
if (token.IsCancellationRequested)
CompleteAsCanceled(token);
}
goto case ManualResetCompletionSourceStatus.WaitForConsumption;
case ManualResetCompletionSourceStatus.WaitForConsumption:
return versionAndStatus.Version;
default:
return null;
}
} finally {
ExitLock();
}
}
throw new ArgumentOutOfRangeException("timeout");
}
private protected void EnterLock()
{
ref int location = ref lockState;
if (Interlocked.Exchange(ref location, 1) == 1)
<EnterLock>g__Contention|36_0(ref location);
}
private bool TryEnterLock()
{
return Interlocked.Exchange(ref lockState, 1) == 0;
}
private protected void ExitLock()
{
Volatile.Write(ref lockState, 0);
}
[Conditional("DEBUG")]
private protected void AssertLocked()
{
}
}
}