Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

Amazon EMR で Apache Iceberg v3 の Deletion Vectors を検証してみた。


この記事はSansan Advent Calendar 2025 - Adventarの15日目の記事です🎄

Sansan Engineering Unit Infrastructureグループの織田繁です。

Amazon EMR 7.12 で Apache Iceberg v3 テーブルフォーマットがサポートされたので、AWS/Iceberg好きとして調べてみました。Apache Icebergで運用しており、削除/更新が多いワークロードを持つ方に興味を持って頂けますと幸いです。
aws.amazon.com


主な新機能としては以下が追加されました。
■Deletion Vectors(削除ベクトル)
データファイル全体を書き直すことなく、効率的に行レベルの削除が可能になります。削除情報をコンパクトなバイナリビットマップとして保存し、読み取り時にマージします。

■Row Lineage(行系譜)
各行の変更履歴を追跡できるようになります。行IDとシーケンス番号を使用して、スナップショット間での行レベルの変更を完全に把握できます。

■Default Column Values(デフォルトカラム値)
新しいカラムを追加する際、データファイルの書き直し(バックフィル)が不要になります。デフォルト値はメタデータに保存され、クエリ時に動的に適用されます。

■VARIANT型(半構造化データ対応)
JSON形式の半構造化データを柔軟に扱える新しいデータ型です。厳密なスキーマ定義なしでデータを取り込めます。

■Geospatial Types(地理空間データ型)
地理空間データを扱うための新しいデータ型が追加されました。

このうち、削除・更新が頻繁に発生するワークロードでの性能改善が期待できる
Deletion Vectors(削除ベクトル)について、検証してみました。

前提

従来(v2)の削除方式

Apache Iceberg v2では、データの削除や更新する際に主に2つの方式が使用できました。

■Copy-on-Write(CoW)方式

データを削除・更新する際に、対象データを含むファイル全体を新しく書き直す方式です。例えば、1つのデータファイルに1,000万行のデータが格納されていて、そのうち1行だけを削除したい場合

  • 削除対象の1行を除いた残りの行を含む新しいファイルを作成します
  • 元の1,000万行あるファイルは削除扱いとなります

課題点

  • 書き込みコストが高い : 1行だけの削除でも、ファイル全体を書き直す必要があり、大量のI/O処理が発生
  • ストレージ容量の増加 : 新しいファイルが作成されるため、一時的にストレージ使用量が増加
  • 頻繁な削除・更新に不向き : 少量の変更でも大きなコストがかかる
■Merge-on-Read(MoR)方式

データを削除・更新する際に、Delete Filesという別ファイルに削除情報を記録する方式です。Delete Filesの仕組みは

  • 削除された行の情報を別のファイル(delete file)に記録
  • 読み取り時にデータファイルとdelete fileを照合して、削除された行をスキップ
  • ファイル全体の書き直しは不要

課題点

  • 管理の複雑化 : Delete Filesの数が増えると管理が複雑になる
  • 読み取りコストの増加 : 読み取り時に複数のDelete Filesを参照する必要があり、パフォーマンスが低下する可能性
  • ストレージ効率の低下 : Delete Filesのフォーマットが効率的ではない場合がある

Deletion Vectors(v3)の改善点

Apache Iceberg v3では、v2のMerge-on-Read方式を改善した Deletion Vectors(削除ベクトル) が導入されました。

■Deletion Vectors(DV)方式

データを削除・更新する際に、削除された行の位置情報をコンパクトなビットマップ形式で記録する方式です。例えば、1つのデータファイルに1,000万行のデータが格納されていて、そのうち1行だけを削除したい場合

  • 削除ベクトル(ビットマップ)に「行番号Xが削除された」という情報を記録
  • 読み取り時に削除ベクトルを高速にチェックして削除行をスキップ
  • ファイル全体の書き直しは不要

改善点

  • ストレージ効率の大幅向上 : ビットマップ形式により、v2のDelete Filesよりもさらにコンパクト(1,000万行のデータでも、削除情報は数KB程度)
  • 読み取りパフォーマンスの向上 : ビットマップによる高速な削除チェックが可能で、複数のDelete Filesを管理する必要がない
  • スケーラビリティの向上 : 大量の削除操作でもファイル数が増えにくく、メタデータの管理が容易
  • 書き込みパフォーマンスの維持 : v2のMoR方式と同様に、ファイル全体の書き直しは不要で、より効率的なフォーマットで削除情報を記録

性能比較の実験結果

Amazon EMR Serverless上で、idとvalueだけを持つTABLEに1,000万行のデータを挿入し、100万行を削除する処理を実行し、3つの方式の性能を比較しました。

実験環境

  • プラットフォーム: Amazon EMR Serverless 7.12
  • Sparkバージョン: 3.5.6-amzn-1
  • アーキテクチャ: x86_64
  • キャパシティ: 400 vCPUs, 3000 GB memory, 20000 GB disk(デフォルト)
  • データ量: 1,000万行のテーブルを作成
  • 削除操作: 10回のバッチで100万行を削除(各バッチで10万行)
  • ストレージ: S3バケット


Scriptコードを表示する

from pyspark.sql import SparkSession
import sys
import time

def main():
    bucket = sys.argv[1] if len(sys.argv) > 1 else "shigeruoda-inventry-439023032515-20251126"

    spark = SparkSession.builder \
        .appName("Iceberg-CoW-MoR-DeletionVectors-Performance") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.glue.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
        .config("spark.sql.catalog.glue.warehouse", f"s3://{bucket}/warehouse/") \
        .getOrCreate()

    print("=" * 80)
    print("Iceberg Performance Comparison: CoW vs MoR vs Deletion Vectors")
    print("=" * 80)
    print(f"Bucket: s3://{bucket}/warehouse/")

    spark.sql("CREATE DATABASE IF NOT EXISTS glue.test_db")

    # v2 Copy-on-Write (CoW) テーブル
    print("\n[1] v2 Copy-on-Write (CoW): Creating table with 10M rows...")
    spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v2_cow")
    spark.sql("""
        CREATE TABLE glue.test_db.data_v2_cow (id BIGINT, value STRING)
        USING iceberg TBLPROPERTIES (
            'format-version' = '2',
            'write.delete.mode' = 'copy-on-write'
        )
    """)
    spark.range(10000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v2_cow").append()

    print("Deleting 1M rows in 10 batches...")
    start = time.time()
    for i in range(10):
        spark.sql(f"DELETE FROM glue.test_db.data_v2_cow WHERE id >= {i * 100000} AND id < {(i + 1) * 100000}")
    v2_cow_time = time.time() - start

    v2_cow_files = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2_cow.files").collect()[0][0]
    print(f"v2 CoW: {v2_cow_time:.2f}s, {v2_cow_files} data files")

    # v2 Merge-on-Read (MoR with Delete Files) テーブル
    print("\n[2] v2 Merge-on-Read (Delete Files): Creating table with 10M rows...")
    spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v2_mor")
    spark.sql("""
        CREATE TABLE glue.test_db.data_v2_mor (id BIGINT, value STRING)
        USING iceberg TBLPROPERTIES (
            'format-version' = '2',
            'write.delete.mode' = 'merge-on-read',
            'write.update.mode' = 'merge-on-read',
            'write.merge.mode' = 'merge-on-read'
        )
    """)
    spark.range(10000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v2_mor").append()

    print("Deleting 1M rows in 10 batches...")
    start = time.time()
    for i in range(10):
        spark.sql(f"DELETE FROM glue.test_db.data_v2_mor WHERE id >= {i * 100000} AND id < {(i + 1) * 100000}")
    v2_mor_time = time.time() - start

    v2_mor_files = spark.sql("SELECT * FROM glue.test_db.data_v2_mor.files").collect()[0][0]

    # Delete filesの数を取得(content列の値を確認)
    print("Checking file types in v2 MoR table...")
    v2_mor_file_types = spark.sql("""
        SELECT content, COUNT(*) as count
        FROM glue.test_db.data_v2_mor.files
        GROUP BY content
    """).collect()
    for row in v2_mor_file_types:
        print(f"  content={row['content']}: {row['count']} files")

    # Position delete filesを取得(content = 1 または content = 'POSITION_DELETES')
    v2_mor_delete_files = spark.sql("""
        SELECT COUNT(*) FROM glue.test_db.data_v2_mor.files
        WHERE content = 1 OR content = 'POSITION_DELETES'
    """).collect()[0][0]

    v2_mor_data_files = spark.sql("""
        SELECT COUNT(*) FROM glue.test_db.data_v2_mor.files
        WHERE content = 0 OR content = 'DATA'
    """).collect()[0][0]

    print(f"v2 MoR: {v2_mor_time:.2f}s, {v2_mor_files} total files ({v2_mor_data_files} data files, {v2_mor_delete_files} Delete Files)")

    # v3 Deletion Vectors テーブル
    print("\n[3] v3 Deletion Vectors: Creating table with 10M rows...")
    spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v3_dv")
    spark.sql("""
        CREATE TABLE glue.test_db.data_v3_dv (id BIGINT, value STRING)
        USING iceberg TBLPROPERTIES (
            'format-version' = '3',
            'write.delete.mode' = 'merge-on-read',
            'write.update.mode' = 'merge-on-read',
            'write.merge.mode' = 'merge-on-read'
        )
    """)
    spark.range(10000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v3_dv").append()

    print("Deleting 1M rows in 10 batches...")
    start = time.time()
    for i in range(10):
        spark.sql(f"DELETE FROM glue.test_db.data_v3_dv WHERE id >= {i * 100000} AND id < {(i + 1) * 100000}")
    v3_dv_time = time.time() - start

    v3_dv_files = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v3_dv.files").collect()[0][0]
    print(f"v3 DV: {v3_dv_time:.2f}s, {v3_dv_files} data files")

    # 削除性能の比較結果
    print("\n" + "=" * 80)
    print("DELETE Performance Comparison")
    print("=" * 80)
    print(f"v2 CoW:        {v2_cow_time:.2f}s (baseline)")
    print(f"v2 MoR:        {v2_mor_time:.2f}s ({((v2_cow_time - v2_mor_time) / v2_cow_time * 100):+.1f}%)")
    print(f"v3 DV:         {v3_dv_time:.2f}s ({((v2_cow_time - v3_dv_time) / v2_cow_time * 100):+.1f}%)")
    print(f"\nv3 DV vs v2 MoR: {((v2_mor_time - v3_dv_time) / v2_mor_time * 100):+.1f}% improvement")

    # 削除後のクエリ検証
    print("\n" + "=" * 80)
    print("Query Performance after Deletion")
    print("=" * 80)

    # COUNT query
    print("COUNT query:")
    start = time.time()
    v2_cow_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2_cow").collect()[0][0]
    v2_cow_query_time = time.time() - start

    start = time.time()
    v2_mor_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2_mor").collect()[0][0]
    v2_mor_query_time = time.time() - start

    start = time.time()
    v3_dv_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v3_dv").collect()[0][0]
    v3_dv_query_time = time.time() - start

    print(f"v2 CoW:        {v2_cow_count:,} rows, query time: {v2_cow_query_time:.2f}s")
    print(f"v2 MoR:        {v2_mor_count:,} rows, query time: {v2_mor_query_time:.2f}s")
    print(f"v3 DV:         {v3_dv_count:,} rows, query time: {v3_dv_query_time:.2f}s")

    # SELECT * query
    print("\nSELECT * query:")
    start = time.time()
    v2_cow_select_count = spark.sql("SELECT * FROM glue.test_db.data_v2_cow").count()
    v2_cow_select_time = time.time() - start

    start = time.time()
    v2_mor_select_count = spark.sql("SELECT * FROM glue.test_db.data_v2_mor").count()
    v2_mor_select_time = time.time() - start

    start = time.time()
    v3_dv_select_count = spark.sql("SELECT * FROM glue.test_db.data_v3_dv").count()
    v3_dv_select_time = time.time() - start

    print(f"v2 CoW:        {v2_cow_select_count:,} rows, query time: {v2_cow_select_time:.2f}s")
    print(f"v2 MoR:        {v2_mor_select_count:,} rows, query time: {v2_mor_select_time:.2f}s")
    print(f"v3 DV:         {v3_dv_select_count:,} rows, query time: {v3_dv_select_time:.2f}s")

    # データの整合性確認
    print("\n" + "=" * 80)
    print("Data Verification")
    print("=" * 80)

    v2_cow_range = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v2_cow").collect()[0]
    v2_mor_range = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v2_mor").collect()[0]
    v3_dv_range = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v3_dv").collect()[0]

    print(f"v2 CoW id range: {v2_cow_range[0]} - {v2_cow_range[1]}")
    print(f"v2 MoR id range: {v2_mor_range[0]} - {v2_mor_range[1]}")
    print(f"v3 DV id range:  {v3_dv_range[0]} - {v3_dv_range[1]}")

    # ファイル数の比較
    print("\n" + "=" * 80)
    print("File Count Comparison")
    print("=" * 80)
    print(f"v2 CoW:        {v2_cow_files} data files")
    print(f"v2 MoR:        {v2_mor_files} total files ({v2_mor_data_files} data files, {v2_mor_delete_files} Delete Files)")
    print(f"v3 DV:         {v3_dv_files} data files (deletion vectors in metadata)")

    spark.stop()

if __name__ == "__main__":
    main()

削除性能の比較結果

方式 削除時間 改善率(対CoW) ファイル構成
v2 CoW 27.75秒 baseline 11 data files
v2 MoR 27.77秒 -0.1% 14 total files (12 data + 2 Delete Files)
v3 DV 13.89秒 +50.0% 14 data files (deletion vectors in metadata)

結果の考察

1. 削除性能(書き込み性能)

■v2 CoW: 27.75秒

  • ファイル全体を書き直すため、大規模データでは特に時間がかかる
  • ただし、ファイル数は11個と最も少ない(削除後にファイルが再作成されるため)

■v2 MoR: 27.77秒

  • CoWとほぼ同じ性能(-0.1%、誤差範囲内)
  • Delete Filesを2個作成して削除情報を管理
  • データファイル(12個)は元のまま維持
  • この実験ではCoWと同等の性能になった

■v3 DV: 13.89秒

  • CoW/MoRと比較して49.9%の改善
  • ビットマップ形式で削除情報を効率的に記録
  • Delete Filesの代わりに削除ベクトルをメタデータに保存
2. クエリ性能(読み取り性能)

COUNT クエリ

方式 行数 クエリ時間
v2 CoW 900万行 0.33秒
v2 MoR 900万行 1.20秒
v3 DV 900万行 0.72秒

SELECT * クエリ

方式 行数 クエリ時間
v2 CoW 900万行 0.08秒
v2 MoR 900万行 0.94秒
v3 DV 900万行 0.52秒

■v2 CoW : 0.33秒(COUNT)/ 0.08秒(SELECT *)

  • 削除済みのデータがファイルに含まれないため、読み取り時のオーバーヘッドがない
  • ファイル数が少ないことも有利
  • 両クエリとも最速

■v2 MoR : 1.20秒(COUNT)/ 0.94秒(SELECT *)

  • 2個のDelete Filesを参照する必要があるため、読み取り時にオーバーヘッドが大きい
  • データ量が増えるとDelete Filesのスキャンコストが増加

■v3 DV : 0.72秒(COUNT)/ 0.52秒(SELECT *)

  • MoRより約40%程度高速
  • Deletion Vectorsのチェックが必要だが、ビットマップ形式のため効率的
  • データ量が増えてもスケーラビリティが高い

まとめ

大規模データでの性能比較結果

1,000万行データでの実験により、次の傾向が確認できました。

■削除性能(書き込み)

  • v2 CoW: 27.75秒
  • v2 MoR: 27.77秒(CoWとほぼ同じ)
  • v3 DV: 13.89秒(CoW/MoRの約半分)

■読み取り性能(COUNT クエリ)

  • v2 CoW: 0.33秒(最速)
  • v2 MoR: 1.20秒(CoWの約3倍)
  • v3 DV: 0.72秒(v2 MoRより40%高速)

■読み取り性能(SELECT * クエリ)

  • v2 CoW: 0.08秒(最速)
  • v2 MoR: 0.94秒(CoWの約12倍)
  • v3 DV: 0.52秒(v2 MoRより45%高速)

ユースケース別の推奨

削除・更新中心のワークロードではv3 Deletion Vectors

  • 削除性能が高い(CoW/MoRの約2倍高速)
  • 読み取り性能もv2 MoRより大幅に高速
  • スケールしやすい

読み取り中心のワークロードではCopy-on-Write

  • 読み取り性能が最も高い
  • ただし、削除・更新には時間がかかる点に注意

we are hiring

Sansan技術本部では中途の方向けにカジュアル面談を実施しています。Sansan技術本部での働き方、仕事の魅力について、現役エンジニアの視点からお話しします。「実際に働く人の話を直接聞きたい」「どんな人が働いているのかを事前に知っておきたい」とお考えの方は、ぜひエントリーをご検討ください。

open.talentio.com


open.talentio.com

© Sansan, Inc.