Threadpool / WaitHandle资源泄漏/崩溃 - c#

我认为我可能需要重新考虑我的设计。我很难找到导致计算机完全挂起的错误,有时会从VS 2010中抛出HRESULT 0x8007000E。

我有一个控制台应用程序(稍后将转换为服务),该应用程序根据数据库队列处理文件传输。

我正在限制允许传输的线程。这是因为我们要连接的某些系统只能包含来自某些帐户的一定数量的连接。

例如,系统A只能接受3个同时连接(这意味着3个独立的线程)。这些线程中的每个线程都有其自己唯一的连接对象,因此我们不应该遇到任何同步问题,因为它们不共享连接。

我们要循环处理这些系统中的文件。因此,例如,我们将允许3个连接,每个连接最多可以传输100个文件。这意味着,要从系统A中移出1000个文件,每个循环只能处理300个文件,因为允许3个线程,每个线程100个文件。因此,在此传输的整个生命周期中,我们将有10个线程。我们一次只能运行3个。因此,将有3个周期,最后一个周期将仅使用1个线程来传输最后100个文件。 (3个线程x 100个文件=每个周期300个文件)

例如,当前的体系结构是:

System.Threading.Timer每5秒通过调用GetScheduledTask()检查队列中是否有事情要做
如果什么也没做,GetScheduledTask()根本什么都不做
如果有工作,请创建一个ThreadPool线程来处理工作[工作线程A]
工作线程A看到有1000个文件要传输
工作线程A看到它只能在正在从中获取文件的系统上运行3个线程
工作线程A启动三个新的工作线程[B,C,D]并进行传输
工作线程A等待B,C,D [WaitHandle.WaitAll(transfersArray)]
工作线程A看到队列中还有更多文件(现在应该是700个)
工作线程A创建一个新数组以等待[transfersArray = new TransferArray[3],这是系统A的最大值,但在系统上可能有所不同
工作线程A启动三个新工作线程[B,C,D]并等待它们[WaitHandle.WaitAll(transfersArray)]
重复该过程,直到没有更多文件可移动为止。
工作线程A表示已完成

我正在使用ManualResetEvent处理信号。

我的问题是:

是否有任何明显的情况会导致资源泄漏或我遇到的问题?
我是否应该在每个WaitHandle.WaitAll(array)之后遍历数组并调用array[index].Dispose()?
此过程在任务管理器下的Handle计数缓慢增加
我正在从System.Threading.Timer调用工作线程A的初始创建。会有任何问题吗?该计时器的代码是:

(一些用于调度的类代码)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}

参考方案

0x8007000E是内存不足错误。那和句柄数似乎表明资源泄漏。确保您要处置实现IDisposable的每个对象。这包括您正在使用的ManualResetEvent数组。

如果有时间,您可能还需要转换为使用.NET 4.0 Task类;它旨在更清晰地处理此类复杂情况。通过定义子Task对象,可以减少总体线程数(线程非常昂贵,不仅因为调度,而且还因为它们的堆栈空间)。

当回复有时是一个对象有时是一个数组时,如何在使用改造时解析JSON回复? - java

我正在使用Retrofit来获取JSON答复。这是我实施的一部分-@GET("/api/report/list") Observable<Bills> listBill(@Query("employee_id") String employeeID); 而条例草案类是-public static class…

改造正在返回一个空的响应主体 - java

我正在尝试使用Retrofit和Gson解析一些JSON。但是,我得到的响应机构是空的。当我尝试从对象中打印信息时,出现NullPointerException。我确保URL正确,并且我也确保POJO也正确。我正在使用jsonschema2pojo来帮助创建POJO类。这是我要解析的JSON{ "?xml": { "@versi…

每个文件合并后添加换行 - python

我有很多类似以下内容的JSON文件:例如。1.json{"name": "one", "description": "testDescription...", "comment": ""} test.json{"name"…

Json到php,json_decode返回NULL - php

我正在用PHP进行JSON解析器的一些API,用于存储有关遗产的信息。我在解析时遇到问题,因为它返回的是NULL值而不是数组或对象。简单的JSON代码可以很好地解析,但是可以这样:{"success":true,"totalCount":1,"data":[{"id":99694…

您如何在列表内部调用一个字符串位置? - python

我一直在做迷宫游戏。我首先决定制作一个迷你教程。游戏开发才刚刚开始,现在我正在尝试使其向上发展。我正在尝试更改PlayerAre变量,但是它不起作用。我试过放在列表内和列表外。maze = ["o","*","*","*","*","*",…