从C#到SQL Server的批量插入策略 - c#

在我们当前的项目中,客户会将复杂/嵌套消息的集合发送到我们的系统。这些消息的频率约为。 1000-2000 msg /每秒。

这些复杂的对象包含事务数据(要添加)和主数据(如果找不到则将添加)。但是,客户没有传递主数据的ID,而是传递了“名称”列。

系统检查这些名称是否存在主数据。如果找到,它将使用数据库中的ID,否则先创建此主数据,然后再使用这些ID。

解析主数据ID后,系统会将交易数据插入SQL Server数据库(使用主数据ID)。每封邮件的主实体数量约为15-20。

以下是我们可以采用的一些策略。

我们可以先从C#代码解析主ID(如果找不到主ID,则插入主数据),然后将这些ID存储在C#缓存中。解析完所有ID后,我们可以使用SqlBulkCopy类批量插入交易数据。我们可以访问数据库15次以获取不同实体的ID,然后再访问数据库一次以插入最终数据。我们可以使用相同的连接在完成所有这些处理后将其关闭。
我们可以一次性将所有包含主数据和事务数据的消息发送到数据库(以多个TVP的形式),然后发送到存储过程中,首先为丢失的消息创建主数据,然后插入事务数据。

有人能在这种用例中建议最好的方法吗?

由于某些隐私问题,我无法共享实际的对象结构。但是,这里的假设对象结构非常接近我们的业务对象。

其中一条消息将包含有关来自不同供应商的一种产品的信息(其主数据)及其价格详细信息(交易数据):

主数据(如果找不到,则需要添加)

产品名称:ABC,产品类别:XYZ,制造商:XXX和其他一些详细信息(属性数量在15到20之间)。

交易数据(将始终添加)

供应商名称:A,标价:XXX,折扣:XXX

供应商名称:B,标价:XXX,折扣:XXX

供应商名称:C,标价:XXX,折扣:XXX

供应商名称:D,标价:XXX,折扣:XXX

对于一条消息,它属于一种产品(关于该消息的更改不会经常发生),有关主数据的大多数信息将保持不变,但交易数据将始终波动。因此,系统将检查系统中是否存在产品“ XXX”。如果不是,请检查是否存在此产品提到的“类别”。如果没有,它将为类别和产品插入新记录。对于制造商和其他主数据,将这样做。

多个供应商将同时发送有关多个产品(2000-5000)的数据。

因此,假设我们有1000个供应商,每个供应商正在发送有关10-15种不同产品的数据。每隔2-3秒,每个供应商都会向我们发送这10种产品的价格更新。他可能会开始发送有关新产品的数据,但是这种情况不会很频繁。

参考方案

#2想法可能是最好的选择(即使用多个TVP一次性处理所有15-20个实体到数据库,并处理多达2000条消息的整体)。

在应用程序层缓存主数据查找并在发送到数据库之前进行翻译听起来不错,但会遗漏一些东西:

无论如何,您将不得不点击数据库以获取初始列表
无论如何,您将不得不点击数据库以插入新条目
查找字典中的值以替换ID正是数据库要做的(假设在每个从名称到ID的查找中都使用非聚集索引)
经常查询的值会将其数据页缓存在缓冲池(这是内存缓存)中

为什么要在应用程序层复制数据库层已经提供并正在发生的事情,特别是鉴于以下情况:

15-20个实体最多可以有2万条记录(这是一个相对较小的记录,尤其是考虑到非聚集索引仅需要两个字段时:NameID可以将很多行打包为一个字段)使用100%填充系数的数据页)。
并非所有20k条目都是“活动”或“当前”的,因此您不必担心将它们全部缓存。因此,无论当前值是什么,都可以轻松地将其标识为要查询的值,而那些数据页(可能包括一些不活动的条目,但在那里没什么大不了的)将成为要缓存在缓冲池中的值。

因此,您不必担心由于可能自然而然地更改值(即为特定的Name更新了ID)可能会导致值更改(即为特定的SqlBulkCopy更新了DataTable),从而避免了旧条目的老化或强制密钥过期或重新加载。

是的,内存中缓存是一项很棒的技术,可以极大地加快网站访问速度,但是这些方案/用例适用于非数据库进程出于纯只读目的一遍又一遍地请求相同数据的情况。但是,在这种特殊情况下,数据将被合并,并且查找值的列表可能会频繁更改(而且,由于新条目而不是更新条目)。

话虽这么说,选择#2是必经之路。尽管没有15个TVP,但我已经多次成功地完成了这项技术。可能需要对该方法进行一些优化/调整以调整此特定情况,但是我发现效果很好的是:

通过TVP接受数据。我更喜欢IEnumerable<SqlDataRecord>,因为:

它使存储过程易于自成体系
它非常适合应用程序代码,以将集合完全流式传输到数据库,而无需先将集合复制到yield return;,这会复制集合,这会浪费CPU和内存。这要求您为每个集合创建一个返回for的方法,接受该集合作为输入,并使用foreachTOP (@RecordCount)[Name]循环中发送每个记录。

TVP不适用于统计信息,因此不适用于JOINing(尽管可以通过在查询中使用[Name]来缓解这种情况),但是您不必担心,因为它们仅用于填充实数。缺少任何值的表
步骤1:为每个实体插入缺少的名称。请记住,每个实体的INSERT...SELECT字段上都应该有一个非聚集索引,并且假设ID是聚集索引,则该值自然会成为索引的一部分,因此SqlBulkCopy仅会在除了帮助以下操作。还要记住,此客户端的任何先前执行(即,大致相同的实体值)都将导致这些索引的数据页保留在缓冲池(即内存)中。

;WITH cte AS
(
  SELECT DISTINCT tmp.[Name]
  FROM   @EntityNumeroUno tmp
)
INSERT INTO EntityNumeroUno ([Name])
  SELECT cte.[Name]
  FROM   cte
  WHERE  NOT EXISTS(
                 SELECT *
                 FROM   EntityNumeroUno tab
                 WHERE  tab.[Name] = cte.[Name]
                   )

步骤2:将所有“消息”插入到简单的SqlBulkCopy中,其中由于步骤1,查找表(即“实体”)的数据页已缓存在缓冲池中

最后,请记住,猜想/假设/有根据的猜测不能替代测试。您需要尝试一些方法来查看哪种方法最适合您的特定情况,因为可能还有一些尚未共享的其他细节可能会影响此处的“理想”条件。

我会说,如果消息仅是插入的,那么弗拉德的想法可能会更快。我在这里描述的方法是在更复杂且需要完全同步(更新和删除)的情况下使用的,并且需要进行其他验证和创建相关的操作数据(而不是查找值)。在直接插入上使用SendRows可能会更快(尽管对于仅2000条记录,我怀疑是否存在太多差异),但这是假设您直接将其加载到目标表(消息和查找),而不是直接加载到中间/暂存中表(我相信Vlad的想法是直接int BatchSize到目标表)。但是,如上所述,由于更新查找值的问题,使用外部高速缓存(即不使用缓冲池)也更容易出错。使外部高速缓存失效可能需要花费更多的代码,而不是值得花的钱,特别是如果使用外部高速缓存仅稍微快一点的话。需要综合考虑哪种额外的风险/维护方法才能更好地满足您的需求。

更新

根据评论中提供的信息,我们现在知道:

有多个供应商
每个供应商提供多种产品
产品并非卖方独有;产品由1个或更多供应商出售
产品属性是单数
定价信息具有可以具有多个记录的属性
定价信息仅适用于INSERT(即时间点历史记录)
唯一商品由SKU(或类似字段)确定
创建后,带有现有SKU的产品,否则具有不同属性(例如类别,制造商等)的产品将被视为同一产品;差异将被忽略

考虑到所有这些,我仍然会推荐TVP,但是要重新考虑这种方法,并使之成为以供应商为中心,而不是以产品为中心。这里的假设是供应商随时发送文件。因此,当您获取文件时,将其导入。您将提前进行的唯一查找是供应商。这是基本布局:

假设此时您已经具有VendorID似乎是合理的,因为系统为什么要从未知来源导入文件?
您可以批量导入
创建一个IEnumerable<SqlDataRecord>方法,该方法:

接受FileStream或允许前进的文件
接受类似SqlDataRecord的内容
返回SqlDataRecord
创建一个yield return;以匹配TVP结构
通过FileStream for循环,直到达到BatchSize或File中没有更多记录为止
对数据执行任何必要的验证
将数据映射到SendRows(FileStream, BatchSize)
致电IEnumerable<SqlDataRecord>

开启档案
文件中有数据时

调用存储的过程
传递VendorID
为TVP传递

关闭档案
实验:

在围绕FileStream的循环之前打开SqlConnection,并在循环完成后关闭它
打开SqlConnection,执行存储过程,然后在FileStream循环中关闭SqlConnection。

试用各种BatchSize值。从100开始,然后从200、500等。
存储的过程将处理插入新产品

使用这种类型的结构,您将发送未使用的产品属性(即,仅SKU用于查找现有产品)。但是,它可以很好地扩展,因为文件大小没有上限。如果供应商发送了50种产品,则可以。如果他们发送50k产品,则可以。如果他们发送了400万个产品(这是我正在使用的系统,并且确实处理了更新的产品信息,而该信息因其任何属性而异!),则很好。应用层或数据库层的内存量没有增加,甚至无法处理1000万个产品。导入所需的时间应与发送的产品数量同步增加。

更新2
与源数据有关的新详细信息:

来自Azure EventHub
以C#对象的形式出现(无文件)
产品详细信息通过O.P.系统的API输入
在单个队列中收集(只需将数据拉出插入数据库)

如果数据源是C#对象,那么我肯定会使用TVP,因为您可以通过我在第一次更新中描述的方法(即返回的方法)按原样发送它们。发送一个或多个TVP,以获取每个供应商的价格/报价详细信息,但发送常规输入参数以获取单个属性属性。例如:

CREATE PROCEDURE dbo.ImportProduct
(
  @SKU             VARCHAR(50),
  @ProductName     NVARCHAR(100),
  @Manufacturer    NVARCHAR(100),
  @Category        NVARCHAR(300),
  @VendorPrices    dbo.VendorPrices READONLY,
  @DiscountCoupons dbo.DiscountCoupons READONLY
)
SET NOCOUNT ON;

-- Insert Product if it doesn't already exist
IF (NOT EXISTS(
         SELECT  *
         FROM    dbo.Products pr
         WHERE   pr.SKU = @SKU
              )
   )
BEGIN
  INSERT INTO dbo.Products (SKU, ProductName, Manufacturer, Category, ...)
  VALUES (@SKU, @ProductName, @Manufacturer, @Category, ...);
END;

...INSERT data from TVPs
-- might need OPTION (RECOMPILE) per each TVP query to ensure proper estimated rows

从C#背景学习Java - c#

                    由于复杂的原因(不是C#的错,我非常喜欢该语言),我必须学习Java。我讨厌学习新的语言,而且我也不想学习我已经从C#了解的知识。有哪些资源可以在不学习所有内容的情况下教授不同之处? 参考方案 我意识到这不是一个非常具体的答案,但是我能给您的最好建议是不要以“我讨厌学习新语言”的心态来研究它。如果您熟悉C#,Java将…

从C#分配一个JavaScript变量 - c#

我有一些JavaScript,例如:<script type="text/javascript"> flashvars.myval = "blah"; //get this from c# ..etc </script> 我需要从C#中分配变量我怎样才能做到这一点?我可以调用C#方法吗?即时通讯…

从C#将Excel工作表保存为共享的不受保护的只读模式 - c#

我正在保存使用Workbook.SaveAs方法在C#中创建的工作表。 XlSaveAsAccessMode参数给了我一些问题。xlShared将工作表保存在保护模式下,因此用户无法随意使用图表并仔细检查它们。它基本上显示为无法参考数据的死图。xlExclusive允许用户保存图书的不受保护的访问权限,但不允许其他人访问。其他用户仍然看到无效的图表。我需要将…

从C++调用Python函数 - c++

我正在尝试从C++实现调用Python函数。我以为可以通过函数指针来实现,但是似乎不可能。我一直在使用boost.python完成此操作。假设在Python中定义了一个函数:def callback(arg1, arg2): #do something return something 现在,我需要将此函数传递给C++,以便可以从那里调用它。如何使用boos…

从C++创建的二进制文件中读取Int64值失败 - c#

我正在开发一个C#CE应用程序,以从由C ++ progam创建的二进制文件中读取数据以进行项目验证。下面是C ++程序的编码。 // File Name: Ean2an.bin which is created by struct struct EAN2AN_TYPE { __int64 ean:40; // 5 bytes, up to 12 digit…