c# 許容 同時INSERTのLOTSを実行するときに「UNIQUE KEY制約の違反」を回避する方法




unique 制約 null (3)

シナリオ内のエラーを許可して抑制するほうが、エラーを排除するよりも速いかもしれません。 重複データと同期して複数のソースを統合している場合は、競合状態を管理するためにどこかにボトルネックを作成する必要があります。

ハッシュセット内のレコードの一意の制約を保持するシングルトンマネージャクラスを作成して、セットに追加されたときに重複を自動的に削除することができます。 レコードはDBに送信される前に追加され、ステートメントの完了時に削除されます。 このようにして、ハッシュセットが重複を食べるか、あなたが試しの最初に行った既存のレコードチェックのどちらかが、コミットされた重複レコードを検出します。

単一トランザクション内の特定のキーについて既存のレコードもチェックしていますが、UNIQUE KEY制約で衝突している同時並行SQL INSERTステートメントを多数実行しています。 私は、パフォーマンスを損なうことなく、衝突の量を減らす(または最小限に抑える)方法を探しています。

バックグラウンド:

私はたくさんのHTTP POSTリクエストをINSERTレコードに受け取るASP.NET MVC4 WebApiプロジェクトに取り組んでいます。 1秒間に5Kから10Kのリクエストがあります。 プロジェクトの唯一の責任は、記録の重複排除と集約です。 非常に重い書き込みです。 比較的少量の読み取り要求があります。 いずれもIsolationLevel.ReadUncommittedトランザクションを使用します。

データベーススキーマ

これがDBテーブルです。

CREATE TABLE [MySchema].[Records] ( 
    Id BIGINT IDENTITY NOT NULL, 
    RecordType TINYINT NOT NULL, 
    UserID BIGINT NOT NULL, 
    OtherID SMALLINT NULL, 
    TimestampUtc DATETIMEOFFSET NOT NULL, 
    CONSTRAINT [UQ_MySchemaRecords_UserIdRecordTypeOtherId] UNIQUE CLUSTERED ( 
        [UserID], [RecordType], [OtherID] 
    ), 
    CONSTRAINT [PK_MySchemaRecords_Id] PRIMARY KEY NONCLUSTERED ( 
        [Id] ASC 
    ) 
) 

リポジトリコード

これは、例外を引き起こしているUpsertメソッドのコードです。

using System;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using Dapper;

namespace MyProject.DataAccess
{
    public class MyRepo
    {
        public void Upsert(MyRecord record)
        {
            var dbConnectionString = "MyDbConnectionString";
            using (var connection = new SqlConnection(dbConnectionString))
            {
                connection.Open();
                using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
                {
                    try
                    {
                        var existingRecord = FindByByUniqueKey(transaction, record.RecordType, record.UserID, record.OtherID);

                        if (existingRecord == null)
                        {
                            const string sql = @"INSERT INTO [MySchema].[Records] 
                                                 ([UserID], [RecordType], [OtherID], [TimestampUtc]) 
                                                 VALUES (@UserID, @RecordType, @OtherID, @TimestampUtc) 
                                                 SELECT CAST(SCOPE_IDENTITY() AS BIGINT";
                            var results = transaction.Connection.Query<long>(sql, record, transaction);
                            record.Id = results.Single();
                        }
                        else if (existingRecord.TimestampUtc <= record.TimestampUtc)
                        {
                            // UPDATE
                        }

                        transaction.Commit();
                    }
                    catch (Exception e)
                    {
                        transaction.Rollback();
                        throw e;
                    }
                }
            }
        }

        // all read-only methods use explicit transactions with IsolationLevel.ReadUncommitted

        private static MyRecord FindByByUniqueKey(SqlTransaction transaction, RecordType recordType, long userID, short? otherID)
        {
            const string sql = @"SELECT * from [MySchema].[Records] 
                                 WHERE [UserID] = @UserID
                                 AND [RecordType] = @RecordType
                                 AND [OtherID] = @OtherID";
            var paramz = new {
                UserID = userID,
                RecordType = recordType,
                OtherID = otherID
            };
            var results = transaction.Connection.Query<MyRecord>(sql, paramz, transaction);
            return results.SingleOrDefault();
        }
    }

    public class MyRecord
    {
        public long ID { get; set; }
        public RecordType RecordType { get; set; }
        public long UserID { get; set; }
        public short? OtherID { get; set; }
        public DateTimeOffset TimestampUtc { get; set; }
    }

    public enum RecordType : byte
    {
        TypeOne = 1,
        TypeTwo = 2,
        TypeThree = 3
    }
}

問題

サーバーに十分な負荷がかかっていると、次のような多くの例外が発生します。

UNIQUE KEY制約 'UQ_MySchemaRecords_UserIdRecordTypeOtherId'に違反しています。 オブジェクト 'MySchema.Records'に重複キーを挿入できません。 重複キー値は(1234567890、1、123)です。 ステートメントは終了されました。

この例外は頻繁に発生し、1分に10回も発生します。

私が試したこと

  • IsolationLevelSerializable変更してみました。 例外はそれほど頻繁には発生しませんでしたが、それでも発生しました。 また、コードのパフォーマンスが大幅に低下しました。 システムは1秒間に2Kのリクエストしか処理できませんでした。 私は、このスループットの低下が実際に例外の減少の原因であったと思うので、これで私の問題は解決しなかったと私は結論しました。
  • UPDLOCK テーブルヒントを使用することを検討しましたが、分離レベルとどのように連携するか、またはコードに適用する方法を完全には理解していません。 それは私の現在の理解から、それはしかし最良の解決策かもしれないように思えます。
  • here示すように、最初のSELECTステートメント(既存のレコード用)をINSERTステートメントの一部として追加することも試みhereが、それでも同じ問題がありました。
  • SQL MERGEステートメントを使用してUpsertメソッドを実装しようとしましたが、これも同じ問題を抱えていました。

私の質問

  • この種のUNIQUEキー制約の衝突を防ぐためにできることはありますか?
  • UPDLOCKテーブルヒント(またはそのことについて他のテーブルヒント)を使用する必要がある場合、それをコードに追加するにはどうすればよいですか。 INSERTに追加しますか? SELECT ? 両方?

検証読み込みをロックします。

FROM SomeTable WITH (UPDLOCK, ROWLOCK, HOLDLOCK)

これは単一のキーへのアクセスを直列化し、他のすべてのキーに対する同時実行を可能にします。

HOLDLOCK (= SERIALIZABLE )は値の範囲を保護します。 これにより、存在しない行が存在し続け、 INSERTが成功するようになります。

UPDLOCKは、既存の行が別の並行トランザクションによって変更または削除されないようにして、 UPDATEが成功するようにします。

ROWLOCK 、エンジンが行レベルのロックを取得することを推奨します。

これらの変更により、デッドロックの可能性高まります。


私の知る限り、唯一の解決策はinsert前に重複をチェックすることです。 DBへの少なくとも1回のラウンドトリップが要求され、パフォーマンスが低下します。

テーブルに対してSELECTを実行してロックを保持し、他の並列スレッドがSELECTして同じ値を取得するのを防ぐことができます。 詳細な解決策は次のとおりです。EFコードでの悲観的ロック

シモンズ :私の提案した解決策は、あなたがバッファやキューを使用したくないというこの仮定に基づいていると言うべきです。





dapper