我正在实现一个并发上限的辅助引擎。我正在使用信号灯,直到并发降至最大数量以下,然后使用Task.Factory.StartNew
将异步处理程序包装在try
/ catch
中,并使用finally
释放信号灯。
我意识到这会在线程池上创建线程-但是我的问题是,当那些任务运行线程中的一个实际上正在等待时(在真正的IO调用或等待句柄上)时,线程是否返回池中,正如我希望的那样将是?
如果在工作处理程序是异步方法(返回Task
)的情况下,有更好的方法来实现并发性受限的任务计划程序,我也很乐意听到。或者,理想情况下,如果有一种方法可以使异步方法排队(同样,这是一个Task
返回的异步方法),其感觉比将其包装在同步委托中并传递给Task.Factory.StartNew
的感觉要小。看起来很完美..?
(这也使我认为这里有两种并行性:总体上正在处理多少个任务,以及同时在不同线程上运行的连续性是多少。虽然这不是固定的要求,但对于两者都具有可配置的选项可能很酷。 ..)
编辑:片段:
concurrencySemaphore.Wait(cancelToken);
deferRelease = false;
try
{
var result = GetWorkItem();
if (result == null)
{ // no work, wait for new work or exit signal
signal = WaitHandle.WaitAny(signals);
continue;
}
deferRelease = true;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
}
finally
{
concurrencySemaphore.Release();
}
}, cancelToken));
}
finally
{
if (!deferRelease)
{
concurrencySemaphore.Release();
}
}
参考方案
这是一个TaskWorker的示例,该示例不会产生无数个工作线程。
魔术是通过等待SemaphoreSlim.WaitAsync()
来完成的,这是一个IO任务(并且没有线程)。
class TaskWorker
{
private readonly SemaphoreSlim _semaphore;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cancellationToken);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
await _semaphore.WaitAsync(cancellationToken);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
和一个简单的控制台应用程序进行测试
class Program
{
static void Main(string[] args)
{
var worker = new TaskWorker(1);
var cts = new CancellationTokenSource();
var token = cts.Token;
var tasks = Enumerable.Range(1, 10)
.Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
.ToArray();
Task.WhenAll(tasks).GetAwaiter().GetResult();
}
static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
{
Console.WriteLine($"Some Started {id}");
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Some Finished {id}");
}
}
更新资料
TaskWorker
实施IDisposable
class TaskWorker : IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly SemaphoreSlim _semaphore;
private readonly int _maxDegreeOfParallelism;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cts.Token);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
await _semaphore.WaitAsync(cts.Token);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
private void ThrowIfDisposed()
{
if (disposedValue)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
}
#region IDisposable Support
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_cts.Cancel();
// consume all semaphore slots
for (int i = 0; i < _maxDegreeOfParallelism; i++)
{
_semaphore.WaitAsync().GetAwaiter().GetResult();
}
_semaphore.Dispose();
_cts.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}
Task.IsCancelled不起作用 - c#我有以下示例代码:static class Program { static void Main() { var cts = new CancellationTokenSource(); var task = Task.Factory.StartNew( () => { try { Console.WriteLine("Task: Runni…
检查类型参数是否为特定接口 - java我正在编写一个工厂类,如下所示:public class RepositoryFactory<T> { public T getRepository(){ if(T is IQuestionRepository){ // This is where I am not sure return new QuestionRepository(); } …
从具有返回类型Task的方法返回值 - c#我可能在这里错过了一些东西。语法不正确。static Task<int> MathOperation(int number) { //return new Task(new Func(TestMethod(number))); } static int LongRunningMethod(int number) { // some long ru…
创建芹菜任务的不同方法之间的区别 - python通过查看创建芹菜任务的不同方法,我感到非常困惑。从表面上看,它们都是一样的,所以,有人可以解释一下两者之间的区别是什么。1。from myproject.tasks import app @app.task def foo(): pass 2。from celery import task @task def foo(): pass 3。from celer…
在Spring中实例化一个新线程以定期执行任务 - java我有一个用Spring制作的仪表板,它必须控制某些任务的执行。基本思想是有一个线程将该任务定期发送到远程跟踪器。如何实例化该线程?我已经阅读了一些,有人说使用线程不是一个好主意。这会导致Spring生命周期出现问题吗?还有另一种方法可以使方法定期调用吗? 参考方案 Spring支持任务计划。在此处查找更多信息:http://static.springsour…