任务计划程序:在Task.Factory.StartNew中等待时,线程是否返回到池中? - c#

我正在实现一个并发上限的辅助引擎。我正在使用信号灯,直到并发降至最大数量以下,然后使用Task.Factory.StartNew将异步处理程序包装在try / catch中,并使用finally释放信号灯。

我意识到这会在线程池上创建线程-但是我的问题是,当那些任务运行线程中的一个实际上正在等待时(在真正的IO调用或等待句柄上)时,线程是否返回池中,正如我希望的那样将是?

如果在工作处理程序是异步方法(返回Task)的情况下,有更好的方法来实现并发性受限的任务计划程序,我也很乐意听到。或者,理想情况下,如果有一种方法可以使异步方法排队(同样,这是一个Task返回的异步方法),其感觉比将其包装在同步委托中并传递给Task.Factory.StartNew的感觉要小。看起来很完美..?

(这也使我认为这里有两种并行性:总体上正在处理多少个任务,以及同时在不同线程上运行的连续性是多少。虽然这不是固定的要求,但对于两者都具有可配置的选项可能很酷。 ..)

编辑:片段:

                    concurrencySemaphore.Wait(cancelToken);
                    deferRelease = false;
                    try
                    {
                        var result = GetWorkItem();
                        if (result == null)
                        { // no work, wait for new work or exit signal
                            signal = WaitHandle.WaitAny(signals);
                            continue;
                        }

                        deferRelease = true;
                        tasks.Add(Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
                            }
                            finally
                            {
                                concurrencySemaphore.Release();
                            }
                        }, cancelToken));
                    }
                    finally
                    {
                        if (!deferRelease)
                        {
                            concurrencySemaphore.Release();
                        }
                    }

参考方案

这是一个TaskWorker的示例,该示例不会产生无数个工作线程。

魔术是通过等待SemaphoreSlim.WaitAsync()来完成的,这是一个IO任务(并且没有线程)。

class TaskWorker
{
    private readonly SemaphoreSlim _semaphore;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        // No ConfigureAwait(false) here to keep the SyncContext if any
        // for the real task
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            return await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }
}

和一个简单的控制台应用程序进行测试

class Program
{
    static void Main(string[] args)
    {
        var worker = new TaskWorker(1);
        var cts = new CancellationTokenSource();
        var token = cts.Token;

        var tasks = Enumerable.Range(1, 10)
            .Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
            .ToArray();

        Task.WhenAll(tasks).GetAwaiter().GetResult();
    }

    static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Some Started {id}");
        await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
        Console.WriteLine($"Some Finished {id}");
    }
}

更新资料

TaskWorker实施IDisposable

class TaskWorker : IDisposable
{
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();
    private readonly SemaphoreSlim _semaphore;
    private readonly int _maxDegreeOfParallelism;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _maxDegreeOfParallelism = maxDegreeOfParallelism;
        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            // No ConfigureAwait(false) here to keep the SyncContext if any
            // for the real task
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                return await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    private void ThrowIfDisposed()
    {
        if (disposedValue)
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #region IDisposable Support
    private bool disposedValue = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                _cts.Cancel();
                // consume all semaphore slots
                for (int i = 0; i < _maxDegreeOfParallelism; i++)
                {
                    _semaphore.WaitAsync().GetAwaiter().GetResult();
                }
                _semaphore.Dispose();
                _cts.Dispose();
            }
            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
    }
    #endregion
}

Task.IsCancelled不起作用 - c#

我有以下示例代码:static class Program { static void Main() { var cts = new CancellationTokenSource(); var task = Task.Factory.StartNew( () => { try { Console.WriteLine("Task: Runni…

检查类型参数是否为特定接口 - java

我正在编写一个工厂类,如下所示:public class RepositoryFactory<T> { public T getRepository(){ if(T is IQuestionRepository){ // This is where I am not sure return new QuestionRepository(); } …

从具有返回类型Task的方法返回值 - c#

我可能在这里错过了一些东西。语法不正确。static Task<int> MathOperation(int number) { //return new Task(new Func(TestMethod(number))); } static int LongRunningMethod(int number) { // some long ru…

创建芹菜任务的不同方法之间的区别 - python

通过查看创建芹菜任务的不同方法,我感到非常困惑。从表面上看,它们都是一样的,所以,有人可以解释一下两者之间的区别是什么。1。from myproject.tasks import app @app.task def foo(): pass 2。from celery import task @task def foo(): pass 3。from celer…

在Spring中实例化一个新线程以定期执行任务 - java

我有一个用Spring制作的仪表板,它必须控制某些任务的执行。基本思想是有一个线程将该任务定期发送到远程跟踪器。如何实例化该线程?我已经阅读了一些,有人说使用线程不是一个好主意。这会导致Spring生命周期出现问题吗?还有另一种方法可以使方法定期调用吗? 参考方案 Spring支持任务计划。在此处查找更多信息:http://static.springsour…