C#AsyncEnumerable运行/等待多个任务从未完成 - c#

我想要一个接收Task<bool>并在X任务中运行它的函数。

为此,我编写了以下代码:

public static class RetryComponent
{
    public static async Task RunTasks(Func<Task<bool>> action, int tasks, int retries, string method)
    {
        // Running everything
        var tasksPool = Enumerable.Range(0, tasks).Select(i => DoWithRetries(action, retries, method)).ToArray();
        await Task.WhenAll(tasksPool);
    }

    private static async Task<bool> DoWithRetries(Func<Task<bool>> action, int retryCount, string method)
    {
        while (true)
        {
            if (retryCount <= 0)
                return false;

            try
            {
                bool res = await action();
                if (res)
                    return true;
            }
            catch (Exception e)
            {
                // Log it
            }

            retryCount--;
            await Task.Delay(200); // retry in 200
        }
    }
}

以及以下执行代码:

BlockingCollection<int> ints = new BlockingCollection<int>();
foreach (int i in Enumerable.Range(0, 100000))
{
    ints.Add(i);
}
ints.CompleteAdding();

int taskId = 0;
var enumerable = new AsyncEnumerable<int>(async yield =>
{
    await RetryComponent.RunTasks(async () =>
    {
        try
        {
            int myTaskId = Interlocked.Increment(ref taskId);

            // usually there are async/await operations inside the while loop, this is just an example

            while (!ints.IsCompleted)
            {
                int number = ints.Take();

                Console.WriteLine($"Task {myTaskId}: {number}");
                await yield.ReturnAsync(number);
            }
        }
        catch (InvalidOperationException)
        {
            return true;
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }

        return true;
    }, 10, 1, MethodBase.GetCurrentMethod().Name);
});

await enumerable.ForEachAsync(number =>
{
    Console.WriteLine(number);
});

其中AsyncEnumerable来自System.Collections.Async

控制台显示任务10:X(其中x是列表中的数字..)。

当我删除AsyncEnumerable时,一切都按预期工作(所有任务正在打印,执行结束)。
由于某些原因(我无法找到很多时间),使用AsyncEnumerable只会破坏所有内容(在我的主代码中,我需要使用AsyncEnumerable ..可伸缩性内容。)这意味着代码永远不会停止,而只有最后一个任务(10)正在打印。当我添加更多日志时,我看到任务1-9永远不会完成。

因此,为了澄清问题,我想让多个任务执行异步操作,并将结果产生到充当管道的单个AsyncEnumerable对象。 (这就是主意。)

参考方案

问题是枚举器/生成器模式是顺序的,但是您正在尝试创建多生产者,单消费者模式。由于您使用嵌套的匿名函数,并且堆栈溢出不会显示行号,因此很难准确描述我要指代的代码的哪一部分,但是无论如何我都会尝试。

AsyncEnumerable的工作方式基本上是等待生产者产生一个值,然后等待使用者使用该值,然后重复。它不支持生产者和消费者以不同的速度运行,因此为什么我说这种模式是连续的。它没有生产项目only the current value的队列。 ReturnAsync does not wait供使用者使用该值,而是应该等待它返回的任务,这会向您发出信号,表明它已准备就绪。因此,我们可以得出结论,它不是线程安全的。

但是,RetryComponent.RunTasks并行运行10个任务,该代码调用yield.ReturnAsync而不检查是否有人已经调用了它,以及是否已经完成该任务。由于Yield类仅存储当前值,因此您的10个并发任务会覆盖当前值,而无需等待Yield对象准备好获取新值,因此9个任务会丢失并且永远不会等待。由于这9个任务从未等待,因此方法永远不会完成,Task.WhenAll也永远不会返回,整个调用堆栈中的任何其他方法也不会执行。

I created an issue on github建议他们改进其库以在发生这种情况时引发异常。如果他们实现了它,那么catch块会将消息写入控制台并重新抛出错误,从而使任务处于错误状态,这将允许task.WhenAll完成,因此程序不会挂起。

您可以使用多线程同步API来确保一次只调用一项任务yield.ReturnAsync并等待返回任务。或者您可以避免使用多生产者模式,因为单个生产者可以轻松地成为枚举器。否则,您将需要重新考虑如何实现多生产者模式。我建议TPL Dataflow内置于.NET Core中,并作为NuGet包在.NET Framework中提供。

剃刀付款集成->如何通过关闭按钮X检测剃刀付款模型是否关闭 - javascript

当用户关闭而无需付款时,我在CI框架中使用Razorpay,请创建razor支付模型,然后取消订单,我希望按状态更改为已取消的状态触发查询。所以我怎么能检测到这一点。我已经通过单击jQuery单击关闭功能但无法使用... javascript大神给出的解决方案 Razorpay提供了JS方法来检测模式关闭。您编写的任何JS代码都不会在结帐页面上运行,因为它是…

如何使用箭头符号(->)创建受保护的方法? - java

当我们编写以下代码时Stream.of(1,2,3,4,5).filter(i -> (i%2 == 0)).map( i -> i*i ); 表达式i -> (i%2 == 0)或i -> i*i将变为私有方法。在我的用例中,编写了一个junit测试,以确保没有方法是私有的(是的,这是强制性的),并且对于这些lambda表达式而言,…

多重处理:map与map_async - python

使用map和map_async有什么区别?将列表中的项目分配给4个进程后,它们是否运行相同的功能?因此,假设两者都在异步和并行运行是错误的吗?def f(x): return 2*x p=Pool(4) l=[1,2,3,4] out1=p.map(f,l) #vs out2=p.map_async(f,l) python大神给出的解决方案 将作业映射到流程…

粗糙的Unicode->没有CLDR的语言代码? - javascript

我在写字典应用。如果用户键入Unicode字符,我想检查该字符是哪种语言。例如字 - returns ['zh', 'ja', 'ko'] العربية - returns ['ar'] a - returns ['en', 'fr', …

将谓词<T>转换为Func <T,bool> - c#

我有一个包含成员Predicate的类,希望在Linq表达式中使用该类:using System.Linq; class MyClass { public bool DoAllHaveSomeProperty() { return m_instrumentList.All(m_filterExpression); } private IEnumerable&…