Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

.NET から Azure Table Storage へのバッチ処理

高橋 洸 です。今回は最近触った Azure Table Storage の .NET SDK の話をします。簡単に扱えてしまうが故にあっさりハマることがあるので要注意です。

Table Storage は Microsoft Azure のサービスのひとつで、マネージドな NoSQL キーバリューストアを提供します。 Amazon Web Service でいう DynamoDB に相当します。

データレプリケーションのレベルにも依りますが、容量あたりのコストが非常に安価で、非構造化データ、半構造化データを大量に格納するといった用途に向いています。

ここでは .NET の SDK である Microsoft.Azure.Storage.Common を用いて Azure Table Storage へバッチ処理を行う際の作法、注意点について述べます。

まず、単一のデータを追加する場合はこのようにします。 MyEntity クラスは TableEntity クラスを継承させたものと考えてください。

CloudTable table = GetCloudTable();
MyEntity entity = CreateMyEntity();
TableOperation insertOperation = new TableOperation.Insert(entity);
await table.ExecuteAsync(insertOperation, cancellationToken);

GetCloudTable() の中身はこんな感じ。なお Azure Functions (AWS でいう AWS Lambda ) を実装する場合は CloudTable オブジェクトを外部から注入できるのでこの辺の実装は不要です。

CloudTable GetCloudTable()
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
         CloudConfigurationManager.GetSetting("StorageConnectionString"));

    CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

    CloudTable table = tableClient.GetTableReference("myTable");
}

続いて複数のデータを追加する場合。以下のように書いても動くのですが、毎回の API コールのオーバーヘッドがかかる、途中で失敗した場合を考慮する必要がある、といった問題があります。

CloudTable table = GetCloudTable();
IEnumerable<MyEntity> entities = CreateMyEntities();
foreach(var entity in entities)
{
    TableOperation insertOperation = new TableOperation.Insert(entity);
    await table.ExecuteAsync(insertOperation, cancellationToken);
}

そこで Table Storage の Entity Group Transactions API を利用します。この API は 1 回のリクエストで複数オペレーションを処理するというものです。 .NET の SDK では TableBatchOperation クラスを用いることでこの API を利用できます。

CloudTable table = GetCloudTable();
IEnumerable<MyEntity> entities = CreateMyEntities();
TableBatchOperation operations = new TableBatchOperation();
foreach(var entity in entities)
{
    operations.Add(TableOperation.Add(entity));
}
await table.ExecuteBatchAsync(operations, cancellationToken);

CloudTable.ExecuteBatchAsync 内では Atomicity が担保されており、途中で失敗したらロールバックされます 1

また TableBatchOperationIList<T> の実装なので実行順序は保証されている、はず、です。

さて、公式ドキュメントのサンプルの注意事項として以下の記述があります。

  • 更新、削除、および挿入を同じ 1 回のバッチ操作で実行できます。
  • 1 つのバッチ操作には、最大 100 個のエンティティを含めることができます。
  • 1 つのバッチ操作に含まれるすべてのエンティティのパーティション キーが同じである必要があります。
  • クエリをバッチ操作として実行することもできますが、バッチ内の唯一の操作である必要があります。

一方、 Entity Group Transactions では以下の記述があります (2019/1/12 現在、日本語版がありません) 。

  • All entities subject to operations as part of the transaction must have the same PartitionKey value.
  • An entity can appear only once in the transaction, and only one operation may be performed against it.
  • The transaction can include at most 100 entities, and its total payload may be no more than 4 MB in size.
  • All entities are subject to the limitations described in Understanding the Table Service Data Model.

見比べてみると微妙に差異があります。マージしてみると、 SDK 利用時の注意事項は以下のように記述できそうです。

  • 更新、削除、および挿入を同じ 1 回のバッチ操作で実行できます。
  • 1 つのバッチ操作には、最大 100 個のエンティティを含めることができ、その合計ペイロードは最大 4 MB までです。
  • 1 つのバッチ操作に含まれるすべてのエンティティのパーティション キーが同じである必要があります。
  • クエリをバッチ操作として実行することもできますが、バッチ内の唯一の操作である必要があります。
  • 1 つのエンティティは 1 回のバッチ操作で1回しか出現できず、それに対して実行できる操作は1つだけです。

1つめ、4つめの注意事項は、そうですか、という感じですが、他はハマりどころがありそうです。

2つめのエンティティの最大件数 / 最大サイズ。これは特に悩ましいですね。 Chunk して投げてもいいですが、途中で失敗した場合をケアする必要があります。 Queue Storage などを利用して結果整合を実現する方法もありますがちょっと腰が重いです。データの一貫性がどれだけ求められるかに依って実装を検討する必要がありますね。

3つめの、パーティションキーを同一にするというもの。これはテーブル設計と絡む話です。論理モデルを考えたらこんなテーブル設計になったが、ユースケースからすると一貫性をもたせたい単位と異なっていた、ということはよく起こる話です。 NoSQL の難しいところですね。

5つめの注意事項は、意図せずハマってしまうこともありそうです。試してみましょう。 1 つのエンティティ、すなわち PartitionKey と RowKey が等しいオブジェクトの追加と削除を 1 つのバッチ操作に入れてみると、例外がスローされます。

try
{
    var operations = new TableBatchOperation
    {
        TableOperation.Insert(new MyEntity { PartitionKey = "hoge", RowKey = "fuga" }), 
        TableOperation.Delete(new MyEntity { PartitionKey = "hoge", RowKey = "fuga", ETag = "*" })
    };  
    await table.ExecuteBatchAsync(operations, cancellationToken);
}
catch (StorageException ex)
{
    // "1:One of the request inputs is not valid. ..."
    log.Error(e.RequestInformation.ExtendedErrorInformation.ErrorMessage)
}

そのほか細かい注意事項ですが entities が空だと ExecuteBatchAsync で空振り......ではなく StorageException がスローされます。 TableBatchOperation に操作を詰める前にチェックするなどのケアが必要です。

var entities = GetEntities();
if(entities.Length > 0)
{
    var operations = new TableBatchOperation();
    foreach(var entity in entities)
    {
        operations.Add(TableOperation.Add(entity));
    }
    await table.ExecuteBatchAsync(operations, cancellationToken);
}

クラウドネイティブ最高! スキーマレス最高! みたいな気持ちもある一方で、何も考えず手軽に実装を始めてしまって後で詰む、みたいなことが容易に起こります。制約というか作法や思想を理解しておくことが大事ですね。

© Sansan, Inc.