DotNext.Threading by .NET Foundation and Contributors

<PackageReference Include="DotNext.Threading" Version="5.12.1" />

 ManualResetCompletionSource

public abstract class ManualResetCompletionSource
Represents base class for producer of value task.
using System; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; namespace DotNext.Threading.Tasks { [NullableContext(2)] [Nullable(0)] public abstract class ManualResetCompletionSource { [StructLayout(LayoutKind.Auto)] private readonly struct Continuation : IThreadPoolWorkItem { private readonly Action<object> action; private readonly object state; private readonly object schedulingContext; private readonly ExecutionContext context; public bool IsValid => action != null; public Continuation(Action<object> action, object state, ValueTaskSourceOnCompletedFlags flags) { this.action = action; this.state = state; schedulingContext = (((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) ? <.ctor>g__CaptureSchedulingContext|4_0() : null); context = (((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) ? ExecutionContext.Capture() : null); } public void InvokeOnCurrentContext(bool runAsynchronously) { if (schedulingContext != null) Invoke(); else if (!runAsynchronously) { action(state); } else if (context != null) { ThreadPool.QueueUserWorkItem(action, state, true); } else { ThreadPool.UnsafeQueueUserWorkItem(action, state, true); } } public void InvokeOnCapturedContext(bool runAsynchronously) { if (schedulingContext != null) InvokeOnSchedulingContext(); else { bool flag = context != null; if (runAsynchronously) { if (flag) ThreadPool.UnsafeQueueUserWorkItem(this, true); else ThreadPool.UnsafeQueueUserWorkItem(action, state, true); } else if (flag) { ExecutionContext.Run(context, Unsafe.As<ContextCallback>((object)action), state); } else { action(state); } } } private void InvokeOnSchedulingContext() { ExecutionContext executionContext = context; if (executionContext != null) { ExecutionContext executionContext2 = ExecutionContext.Capture(); ExecutionContext.Restore(executionContext); try { Invoke(); } finally { if (executionContext2 != null) ExecutionContext.Restore(executionContext2); } } else Invoke(); } private void Invoke() { object obj = schedulingContext; SynchronizationContext synchronizationContext = obj as SynchronizationContext; if (synchronizationContext == null) { TaskScheduler taskScheduler = obj as TaskScheduler; if (taskScheduler != null) Task.Factory.StartNew(action, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, taskScheduler); } else synchronizationContext.Post(action.Invoke, state); } void IThreadPoolWorkItem.Execute() { ExecutionContext.Restore(context); action(state); } } [StructLayout(LayoutKind.Auto)] private protected struct VersionAndStatus { private ulong value; public short Version => GetVersion(ref value); public ManualResetCompletionSourceStatus Status { get { return GetStatus(ref value); } set { GetStatus(ref this.value) = value; } } public bool IsCompleted => Status >= ManualResetCompletionSourceStatus.WaitForConsumption; public VersionAndStatus() { this = new VersionAndStatus(short.MinValue, ManualResetCompletionSourceStatus.WaitForActivation); } private VersionAndStatus(short version, ManualResetCompletionSourceStatus status) { value = Combine(version, status); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public VersionAndStatus VolatileRead() { VersionAndStatus result = new VersionAndStatus(); result.value = Volatile.Read(ref value); return result; } public bool CanBeCompleted(short? token) { short version = Version; ManualResetCompletionSourceStatus status = Status; if (((uint)status <= 1) ? true : false) return token.GetValueOrDefault(version) == version; return false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Consume(short version) { ulong num = Interlocked.CompareExchange(ref value, Combine(version, ManualResetCompletionSourceStatus.Consumed), Combine(version, ManualResetCompletionSourceStatus.WaitForConsumption)); string message; if (GetStatus(ref num) != ManualResetCompletionSourceStatus.WaitForConsumption) message = ExceptionMessages.InvalidSourceState; else { if (GetVersion(ref num) == version) return; message = ExceptionMessages.InvalidSourceToken; } throw new InvalidOperationException(message); } public short Reset() { short version = GetVersion(ref value); value = Combine(version = (short)(version + 1), ManualResetCompletionSourceStatus.WaitForActivation); return version; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static ref short GetVersion(ref ulong value) { return ref Unsafe.As<ulong, short>(ref value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static ref ManualResetCompletionSourceStatus GetStatus(ref ulong value) { return ref Unsafe.As<int, ManualResetCompletionSourceStatus>(ref Unsafe.Add<int>(ref Unsafe.As<ulong, int>(ref value), 1)); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static ulong Combine(short version, ManualResetCompletionSourceStatus status) { ulong result = default(ulong); Unsafe.SkipInit<ulong>(ref result); GetVersion(ref result) = version; GetStatus(ref result) = status; return result; } } [StructLayout(LayoutKind.Auto)] private struct CancellationState : IDisposable { private CancellationTokenRegistration tokenTracker; private CancellationTokenSource timeoutSource; internal void Initialize(ref VersionAndStatus vs, Action<object, CancellationToken> callback, TimeSpan timeout, CancellationToken token) { IEquatable<short> equatable = null; if (token.CanBeCanceled) tokenTracker = token.UnsafeRegister(callback, (object)(equatable = vs.Version)); vs.Status = ManualResetCompletionSourceStatus.Activated; if (timeout > default(TimeSpan) && !token.IsCancellationRequested) { if (timeoutSource == null) timeoutSource = new CancellationTokenSource(); timeoutSource.Token.UnsafeRegister(callback, equatable ?? ((object)vs.Version)); timeoutSource.CancelAfter(timeout); } } [IsReadOnly] internal bool IsTimeoutToken(CancellationToken token) { CancellationTokenSource cancellationTokenSource = timeoutSource; if (cancellationTokenSource == null) return false; return cancellationTokenSource.Token == token; } internal CancellationState Detach() { CancellationState cancellationState = default(CancellationState); cancellationState.tokenTracker = tokenTracker; CancellationState result = cancellationState; CancellationTokenSource cancellationTokenSource = timeoutSource; if (cancellationTokenSource != null && !cancellationTokenSource.TryReset()) { result.timeoutSource = cancellationTokenSource; timeoutSource = null; } tokenTracker = default(CancellationTokenRegistration); return result; } [IsReadOnly] public void Dispose() { tokenTracker.Unregister(); timeoutSource?.Dispose(); } } protected const short InitialCompletionToken = short.MinValue; private readonly Action<object, CancellationToken> cancellationCallback; private readonly bool runContinuationsAsynchronously; private CancellationState state; private Continuation continuation; private protected VersionAndStatus versionAndStatus; private int lockState; protected object CompletionData { get; set; } public ManualResetCompletionSourceStatus Status => versionAndStatus.VolatileRead().Status; public bool IsCompleted => versionAndStatus.VolatileRead().IsCompleted; private protected ManualResetCompletionSource(bool runContinuationsAsynchronously) { this.runContinuationsAsynchronously = runContinuationsAsynchronously; versionAndStatus = new VersionAndStatus(); cancellationCallback = CancellationRequested; } [MethodImpl(MethodImplOptions.NoInlining)] private void CancellationRequested(object expectedVersion, CancellationToken token) { if (versionAndStatus.Status == ManualResetCompletionSourceStatus.Activated) { EnterLock(); try { if (versionAndStatus.Status != ManualResetCompletionSourceStatus.Activated || versionAndStatus.Version != (short)expectedVersion || !(state.IsTimeoutToken(token) ? CompleteAsTimedOut() : CompleteAsCanceled(token))) return; } finally { ExitLock(); } Resume(); } } private protected abstract bool CompleteAsTimedOut(); private protected abstract bool CompleteAsCanceled(CancellationToken token); protected virtual void CleanUp() { } private CancellationState ResetCore(out short token) { token = versionAndStatus.Reset(); CompletionData = null; return state.Detach(); } public short Reset() { EnterLock(); short token; CancellationState cancellationState = ResetCore(out token); ExitLock(); cancellationState.Dispose(); CleanUp(); return token; } public bool TryReset(out short token) { bool num = TryEnterLock(); if (num) { CancellationState cancellationState = ResetCore(out token); ExitLock(); cancellationState.Dispose(); CleanUp(); return num; } token = 0; return num; } protected virtual void AfterConsumed() { } internal void Resume() { state.Detach().Dispose(); Continuation continuation = this.continuation; if (continuation.IsValid) { this.continuation = default(Continuation); continuation.InvokeOnCapturedContext(runContinuationsAsynchronously); } } private protected bool SetResult(object completionData) { CompletionData = completionData; versionAndStatus.Status = ManualResetCompletionSourceStatus.WaitForConsumption; return continuation.IsValid; } private void OnCompleted([In] [IsReadOnly] ref Continuation continuation, short token) { EnterLock(); string message; if (token != versionAndStatus.Version) { message = ExceptionMessages.InvalidSourceToken; ExitLock(); } else { switch (versionAndStatus.Status) { case ManualResetCompletionSourceStatus.WaitForConsumption: ExitLock(); continuation.InvokeOnCurrentContext(runContinuationsAsynchronously); return; case ManualResetCompletionSourceStatus.Activated: this.continuation = continuation; ExitLock(); return; } message = ExceptionMessages.InvalidSourceState; ExitLock(); } throw new InvalidOperationException(message); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private protected void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { Continuation continuation2 = new Continuation(continuation, state, flags); OnCompleted(ref continuation2, token); } [MethodImpl(MethodImplOptions.AggressiveInlining)] [NullableContext(1)] public bool TrySetException(Exception e) { return TrySetException(null, e); } [NullableContext(1)] public abstract bool TrySetException([Nullable(2)] object completionData, Exception e); [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TrySetCanceled(CancellationToken token) { return TrySetCanceled(null, token); } public abstract bool TrySetCanceled(object completionData, CancellationToken token); private protected short? Activate(TimeSpan timeout, CancellationToken token) { long ticks = timeout.Ticks; if (ticks < 0) { if (ticks != -10000) goto IL_0024; } else if (ticks > 21474836470000) { goto IL_0024; } bool flag = false; goto IL_002a; IL_0024: flag = true; goto IL_002a; IL_002a: if (!flag) { EnterLock(); try { switch (versionAndStatus.Status) { case ManualResetCompletionSourceStatus.WaitForActivation: if (timeout == default(TimeSpan)) CompleteAsTimedOut(); else { state.Initialize(ref versionAndStatus, cancellationCallback, timeout, token); if (token.IsCancellationRequested) CompleteAsCanceled(token); } goto case ManualResetCompletionSourceStatus.WaitForConsumption; case ManualResetCompletionSourceStatus.WaitForConsumption: return versionAndStatus.Version; default: return null; } } finally { ExitLock(); } } throw new ArgumentOutOfRangeException("timeout"); } private protected void EnterLock() { ref int location = ref lockState; if (Interlocked.Exchange(ref location, 1) == 1) <EnterLock>g__Contention|36_0(ref location); } private bool TryEnterLock() { return Interlocked.Exchange(ref lockState, 1) == 0; } private protected void ExitLock() { Volatile.Write(ref lockState, 0); } [Conditional("DEBUG")] private protected void AssertLocked() { } } }