当列表可以附加其他任务时用于等待Task.WhenAny(List <T>)的适当模式 - c#

等待更改中的List<Task>是不可能的,因为Task.WhenAny(List<Task>)获取List<Task>的副本。

什么是合适的模式

List<Task> taskList = new List<Task>();

await Task.WhenAny(taskList);

在第一个WhenAny被调用之后,何时taskList可以添加其他任务?

下面的完整演示代码演示了该问题。

    static readonly List<Task<int>> taskList = new List<Task<int>>();
    static readonly Random rnd = new Random(1);

    static async Task<int> RunTaskAsync(int taskID,int taskDuration)
    {
        await Task.Yield();
        Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
        await Task.Delay(taskDuration);  // mimic some work
        return taskID;
    }
    static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
    {
        // Add numTasks asyncronously to the taskList
        // First task is added Syncronously and then we yield the adds to a worker

        taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
        await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

        // remaing task run's are Yielded to a worker thread
        for (int i = 2; i <= numTasks; i++)
        {
            await Task.Delay(rnd.Next(minDelay, maxDelay));
            taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
        }
    }
    static async Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch(); sw.Start();

        // Start a Fire and Forget Task to create some running tasks
        var _ = AddTasksAsync(10, 1, 3000);

        // while there are tasks to complete use the main thread to process them as they comeplete
        while(taskList.Count > 0)
        {
            var t = await Task.WhenAny(taskList);
            taskList.Remove(t);
            var i = await t;
            Console.WriteLine("Task {0} found to be completed at: {1}",i,sw.Elapsed);
        }

        // All tasks have completed sucessfully - exit main thread
    }

控制台输出,仅在找到并删除60秒任务后,WhenAny()循环才发现所有其他任务已完成。

Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Starting Task: 3 with a duration of 24 seconds
Starting Task: 4 with a duration of 15 seconds
Starting Task: 5 with a duration of 28 seconds
Starting Task: 6 with a duration of 21 seconds
Starting Task: 7 with a duration of 11 seconds
Starting Task: 8 with a duration of 29 seconds
Starting Task: 9 with a duration of 21 seconds
Starting Task: 10 with a duration of 20 seconds
Task 1 found to be completed at: 00:01:00.1305811
Task 2 found to be completed at: 00:01:00.1312951
Task 3 found to be completed at: 00:01:00.1315689
Task 4 found to be completed at: 00:01:00.1317623
Task 5 found to be completed at: 00:01:00.1319427
Task 6 found to be completed at: 00:01:00.1321225
Task 7 found to be completed at: 00:01:00.1323002
Task 8 found to be completed at: 00:01:00.1324379
Task 9 found to be completed at: 00:01:00.1325962
Task 10 found to be completed at: 00:01:00.1327377

谢谢!

参考方案

您显示的代码有问题,即在工作人员和任务创建者之间没有明智的沟通渠道。您需要某种消息传递机制来将新任务(以及没有更多任务的情况)通知工作人员,以便它可以对它做出反应。这是您必须为并发系统确定的事情,确切的实现与该问题有关,因此,我假设我们的工作程序中有OnTaskAdded(Task task)OnEnd()方法。

用您的话说,您不想真正地等到任何任务完成,而是希望每个任务在完成时都执行一些操作。请参阅下面的更新的答案。

这可以通过ContinueWith实现:

class Worker
{
    private List<Task> _tasks = new List<Task>();
    private readonly Stopwatch _stopwatch = new Stopwatch();

    // Start the stopwatch in the constructor or in some kind of a StartProcessing method.

    void OnTaskAdded(Task<int> task)
    {
        var taskWithContinuation = task.ContinueWith(t =>
            Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
        _tasks.Add(taskWithContinuation);
    }

    async Task OnEndAsync()
    {
        // We're finishing work and there will be no more tasks, it's safe to await them all now.
        await Task.WhenAll(_tasks);
    }
}

编辑:
经过所有关于确保合理的消息传递管道的讨论之后,我认为我实际上可以给您一个快速而又肮脏的实现,以便您可以看到它的工作原理:

// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
    Stopwatch sw = new Stopwatch(); sw.Start();

    // Start a Fire and Forget Task to create some running tasks
    var _ = AddTasksAsync(10, 1, 3000);
    var internalList = new List<Task>();

    // while there are tasks to complete use the main thread to process them as they comeplete
    var i = 0;
    while (i < 10)
    {
        while (taskList.Count <= i)
        {
            // No new tasks, check again after a delay -- THIS IS VERY BAD!
            await Task.Delay(100);
        }
        Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
        var taskWithContinuation = taskList[i].ContinueWith(t =>
            Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
        internalList.Add(taskWithContinuation);
        ++i;
    }
    await Task.WhenAll(internalList);
}

让我再次强调:这不是生产质量代码!积极地等待更多任务,嗯。它的输出是这样的:

Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695

您可以看到由于多线程工作的本质,行有些乱七八糟,但时间戳是准确的。

更新:

好吧,我很笨,我刚刚邀请您加入反模式。 Using ContinueWith is dangerous,而且它过于复杂-引入了async / await可以使我们免于手动安排连续性。您只需用Task<int>包装await并记录时间即可。

class Worker
{
    private List<Task> _tasks = new List<Task>();
    private readonly Stopwatch _stopwatch = new Stopwatch();

    // Start the stopwatch in the constructor or in some kind of a StartProcessing method.

    void OnTaskAdded(Task<int> task)
    {
        var taskWithContinuation = ContinueWithLog(task);
        _tasks.Add(taskWithContinuation);
    }

    async Task OnEndAsync()
    {
        // We're finishing work and there will be no more tasks, it's safe to await them all now.
        await Task.WhenAll(_tasks);
    }

    private Task ContinueWithLog(Task<int> task)
    {
        var i = await source;
        Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
    }
}

使用示例代码进行快捷的PoC:

class Program
{
    static readonly List<Task<int>> taskList = new List<Task<int>>();
    static readonly Random rnd = new Random(1);
    static readonly Stopwatch sw = new Stopwatch();

    static async Task<int> RunTaskAsync(int taskID, int taskDuration)
    {
        await Task.Yield();
        Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
        await Task.Delay(taskDuration);  // mimic some work
        return taskID;
    }
    static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
    {
        // Add numTasks asyncronously to the taskList
        // First task is added Syncronously and then we yield the adds to a worker

        taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
        await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

        // remaing task run's are Yielded to a worker thread
        for (int i = 2; i <= numTasks; i++)
        {
            await Task.Delay(rnd.Next(minDelay, maxDelay));
            taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
        }
    }

    public static async Task ContinueWithLog(Task<int> source)
    {
        var i = await source;
        Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
    }

    public static async Task Main()
    {
        sw.Start();

        // Start a Fire and Forget Task to create some running tasks
        var _ = AddTasksAsync(10, 1, 3000);
        var internalList = new List<Task>();

        // while there are tasks to complete use the main thread to process them as they comeplete
        var i = 0;
        while (i < 10)
        {
            while (taskList.Count <= i)
            {
                // No new tasks, check again after a delay -- THIS IS VERY BAD!
                await Task.Delay(100);
            }
            Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
            internalList.Add(ContinueWithLog(taskList[i]));
            ++i;
        }
        await Task.WhenAll(internalList);
    }
}

输出:

Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952

这是实现所需目标的惯用方式。很抱歉首先误导您使用ContinueWith,这是不必要的并且容易出错,现在我们都知道。

将List <List <string >>转换为List <string> - c#

This question already has answers here: Closed 9 years ago. Possible Duplicate: Linq: List of lists to a long list我已经使用LINQ进行了转换。List<List<string>>至List<string>.如…

List <List>混乱 - c#

我的代码段List<List<optionsSort>> stocks = new List<List<optionsSort>>(); optionsSort tempStock1 = new optionsSort(); List<optionsSort> stock = new List<…

List <Dog>是List <Animal>的子类吗?为什么Java泛型不是隐式多态的? - java

我对Java泛型如何处理继承/多态感到困惑。假设以下层次结构-动物(父母)狗-猫(儿童)因此,假设我有一个方法doSomething(List<Animal> animals)。根据继承和多态性的所有规则,我假设List<Dog>是List<Animal>,而List<Cat>是List<Animal&g…

Java中的<<或>>>是什么意思? - java

This question already has answers here: Closed 7 years ago. Possible Duplicate: What does >> and >>> mean in Java?我在一些Java代码中遇到了一些陌生的符号,尽管代码可以正确编译和运行,但对于括号在此代码中的作用却感…

C#等效于Java List <?扩展类> - c#

我有泛型类的基本结构public class Parent<T> where T : Parent<T> { Action<T> Notify; } public class Child : Parent<Child> { } 我想要一个列表,以便可以将Child对象放在此处List<Parent>…