Skip to content

Instantly share code, notes, and snippets.

@acple
Last active January 20, 2017 05:33
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save acple/1ff426cdfeca1cf358b082e816be7578 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncScheduler
{
public class ActionQueueScheduler : IDisposable
{
private readonly ConcurrentQueue<Func<Task>> queue;
private readonly ConcurrentQueue<Func<Task>> immediate;
private readonly CancellationTokenSource canceller;
private int state; // [ 0: stopped, 1: setup, 2: running ]
private Task runner;
public ActionQueueScheduler()
{
this.queue = new ConcurrentQueue<Func<Task>>();
this.immediate = new ConcurrentQueue<Func<Task>>();
this.canceller = new CancellationTokenSource();
}
public Task Add(Action<CancellationToken> action)
=> this.Add(token => { action(token); return default(object); });
public Task<T> Add<T>(Func<CancellationToken, T> action)
{
if (this.canceller.IsCancellationRequested)
return Task.FromCanceled<T>(this.canceller.Token);
var tcs = new TaskCompletionSource<T>();
this.queue.Enqueue(() => this.InvokeAction(action, tcs));
this.Run();
return tcs.Task;
}
public Task Add(Func<CancellationToken, Task> action)
=> this.Add(async token => { await action(token); return default(object); });
public Task<T> Add<T>(Func<CancellationToken, Task<T>> action)
{
if (this.canceller.IsCancellationRequested)
return Task.FromCanceled<T>(this.canceller.Token);
var tcs = new TaskCompletionSource<T>();
this.queue.Enqueue(() => this.InvokeActionAsync(action, tcs));
this.Run();
return tcs.Task;
}
public Task Interrupt(Action<CancellationToken> action)
=> this.Interrupt(token => { action(token); return default(object); });
public Task<T> Interrupt<T>(Func<CancellationToken, T> action)
{
if (this.canceller.IsCancellationRequested)
return Task.FromCanceled<T>(this.canceller.Token);
var tcs = new TaskCompletionSource<T>();
this.immediate.Enqueue(() => this.InvokeAction(action, tcs));
this.Run();
return tcs.Task;
}
public Task Interrupt(Func<CancellationToken, Task> action)
=> this.Interrupt(async token => { await action(token); return default(object); });
public Task<T> Interrupt<T>(Func<CancellationToken, Task<T>> action)
{
if (this.canceller.IsCancellationRequested)
return Task.FromCanceled<T>(this.canceller.Token);
var tcs = new TaskCompletionSource<T>();
this.immediate.Enqueue(() => this.InvokeActionAsync(action, tcs));
this.Run();
return tcs.Task;
}
private Task InvokeAction<T>(Func<CancellationToken, T> action, TaskCompletionSource<T> tcs)
{
try
{
this.canceller.Token.ThrowIfCancellationRequested();
tcs.TrySetResult(action(this.canceller.Token));
}
catch (OperationCanceledException exception)
{
tcs.TrySetCanceled(exception.CancellationToken);
}
catch (Exception exception)
{
tcs.TrySetException(exception);
}
return Task.FromResult(default(object));
}
private async Task InvokeActionAsync<T>(Func<CancellationToken, Task<T>> asyncAction, TaskCompletionSource<T> tcs)
{
try
{
this.canceller.Token.ThrowIfCancellationRequested();
tcs.TrySetResult(await asyncAction(this.canceller.Token).ConfigureAwait(false));
}
catch (OperationCanceledException exception)
{
tcs.TrySetCanceled(exception.CancellationToken);
}
catch (Exception exception)
{
tcs.TrySetException(exception);
}
}
private Task Run()
{
while (Interlocked.CompareExchange(ref this.state, 1, 0) != 0)
if (this.state == 2)
return this.runner;
this.runner = Task.Run(async () =>
{
var action = default(Func<Task>);
while (true)
{
if (this.immediate.TryDequeue(out action))
await action().ConfigureAwait(false);
else if (this.queue.TryDequeue(out action))
await action().ConfigureAwait(false);
else
break;
}
while (Interlocked.CompareExchange(ref this.state, 0, 2) != 2) ;
if (this.queue.Count != 0)
await this.Run().ConfigureAwait(false);
});
Interlocked.Exchange(ref this.state, 2);
return this.runner;
}
public void Dispose()
{
this.canceller.Cancel();
this.Run().Wait();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment