Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

チーム開発合宿 2024 in 徳島県神山町(技術編)

こんにちは、研究開発部 Architectグループの辻田です。 この記事はチームメンバー合同で作成した記事です。

先日、神山ラボへ開発合宿に行ってきました。この記事では合宿中に取り組んだ内容について紹介します。合宿の目的や、全体の様子はチーム開発合宿 2024 in 徳島県神山町(レポート編)をご参照ください。

本記事は【R&D DevOps通信】という連載記事のひとつです。

背景

Sansan Labsで大きめのリリースを控えているため、それまでに障壁となる課題を短期間で改善することが目的です。通常はLabs開発に関わりのないメンバーも含めて、全員で集中して取り組むことで、効率的に課題を解決することがこの合宿に期待していることです。

期待される成果

Sansan Labsの全アプリケーションを、簡単かつ安全にリリースできる状態になっていることが期待される成果です。
タスクの消化量は、通常1週間のスプリントでのポイントを3日間で消化できることが目標です。最終的にどれだけのポイントを消化できたかは、レポート編で紹介しています。

取り組み内容

実際の取り組み内容は以下です。それぞれの背景やゴールは各章で紹介します。

  • E2Eテスト基盤
    • ツール選定と方針決め
    • 研究員が迷わずテストを書けるようなサンプル作成
  • 複数アプリケーションを一括でリリースするCDの開発
  • Labsリソースにおける依存関係可視化の開発
    • ライブラリとそのバージョンの取得
    • 環境変数の取得
    • バッチのデータソースとなるテーブル一覧の取得

E2Eテスト基盤

背景

Labsには現在24のアプリケーションがありますが、それぞれのアプリケーションにはE2Eテストがありません。すべてのアプリケーションをリリースし、手動でテストを行うことは現実的でないです。そのため、まずはE2Eテストのサンプルと、CI上で実行できる仕組みを作ることが目的です。

期待される成果

サンプルテストの作成とCI上での実行可能な状態にすることが期待される成果です。
さらに合宿後には、サンプルを参考に各アプリケーションのテストを研究員が迷わず書けるようにすることがゴールです。

将来的にはPRを出したら、自動でPR環境にデプロイされ、E2Eテストが実行されるようにすることも検討しています。

取り組み内容

ツール選定と方針決め

LabsではフロントエンドにStreamlitを使っています。StreamlitのテストフレームワークとしてApp Testingが最近リリースされたため、これを使ってE2Eテストを行うことにしました。

App Testingには次のような特徴があります。

  • headlessでアプリのインスタンスが作成される
  • ウィジェットの状態、ウィジェットの操作などのテストができる
  • pytestで発動するので作りやすい

候補にはPlaywrightなども上がりましたが、PRごとに検証環境を立てておくのが前提となるため今回は見送ることにしました。

研究員が迷わずテストを書けるようなサンプル作成

App Testingを使って、Streamlitの全コンポーネントを網羅したようなアプリケーションのテストを行うサンプルを作成しました。

tab1, tab2 = st.tabs(["Tab 1", "Tab2"])
tab1.write("Tab 1 content")
with tab2:
    st.write("Tab 2 content")

st.multiselect(label="MultiSelect", options=["1", "2", "3"], default=["1", "2"], max_selections=2)

st.session_state.magic_word = st.session_state.get("magic_word", "Streamlit")

new_word = st.text_input("Magic word:", key="input")

if st.button("Set the magic word"):
    st.session_state.magic_word = new_word

st.radio("Radio", ["1", "2"], 0)

以下は、上記Streamlitのコンポーネントに対するテストの一部です。

selectbox, text_input, buttonなどが正しく動作するかを確認しています。

def test_native_components() -> None:
    at = AppTest.from_file("app.py")
    at.run()

    # selectboxが一つしかない
    assert len(at.selectbox) == 1
    # 選択されている値が予想通りか
    assert at.selectbox[0].options[at.selectbox[0].index] == "1"
    # selectboxを操作して、その結果が予想通りかを確認する
    at.selectbox[0].select_index(2)
    assert at.selectbox[0].options[at.selectbox[0].index] == "3"

    # tabが2つある
    assert len(at.tabs) == 2
    # tab1の中身が予想通りか
    assert at.tabs[0].markdown[0].value == "Tab 1 content"
    # tab2の中身が予想通りか
    assert at.tabs[1].markdown[0].value == "Tab 2 content"

    # multiselectが一つしかない
    assert len(at.multiselect) == 1
    # multiselectのラベルが予想通りか
    assert at.multiselect[0].label == "MultiSelect"
    # multiselectの最大選択数が予想通りか
    assert at.multiselect[0].max_selections == 2
    # multiselectの想定要素が予想通りか
    assert at.multiselect[0].options == ["1", "2", "3"]
    # multiselectを操作して、その結果が予想通りかを確認する
    at.multiselect[0].set_value(["1", "3"])
    assert at.multiselect[0]._value == ["1", "3"]
    # multiselectの要素を削除して、その結果が予想通りか確認する
    at.multiselect[0].unselect("3")
    assert at.multiselect[0]._value == ["1"]

    # text_inputが一つしかない
    assert len(at.text_input) == 1
    # text_inputに入力した値が予想通りか
    at.text_input("input").input("test").run()
    assert at.text_input("input").value == "test"

    # buttonが一つしかない
    assert len(at.button) == 1
    # buttonのラベルが予想通りか
    assert at.button[0].label == "Set the magic word"

    # session_stateのデフォルト値が予想通りか
    assert at.session_state["magic_word"] == "Streamlit"
    at.button[0].click().run()
    # text_inputに入力した値が session_state に保存されているか
    assert at.session_state["magic_word"] == "test"

    # radioが一つしかない
    assert len(at.radio) == 1
    # radioのラベルが予想通りか
    assert at.radio[0].label == "Radio"
    # radioの選択されている値が予想通りか
    assert at.radio[0].options[at.radio[0].index] == "1"
    # radioを操作して、その結果が予想通りかを確認する
    at.radio[0].set_value("2")
    assert at.radio[0].options[at.radio[0].index] == "2"

Labsではデータの可視化にAgGridコンポーネントも使っています。そのため、AgGridコンポーネントのテストも行いました。

まず、AgGridコンポーネントを取得する関数を作成しました。子要素がある場合は再帰的に取得し、UnknownElementを取得します。その後、UnknownElementの中からAgGridコンポーネントを取得します。

def get_unknown_element(elements: list[Element]) -> list[UnknownElement]:
    unknown_elements = []
    for elem in elements:
        if hasattr(elem, "children"):
            unknown_elements.extend(get_unknown_element(elem.children))
        else:
            if isinstance(elem, UnknownElement):
                unknown_elements.append(elem)
    return unknown_elements


def get_aggrid_components(elements: list[Element]) -> list[dict]:
    aggrid_components = []
    unknown_elements = get_unknown_element(elements)
    for elem in unknown_elements:
        if hasattr(elem, "proto") and elem.proto.component_name == "st_aggrid.agGrid":
            aggrid_component = json.loads(elem.proto.json_args)
            aggrid_component["row_data"] = json.loads(aggrid_component["row_data"])
            aggrid_components.append(aggrid_component)
    return aggrid_components

AgGridのデータが正しく表示されているか、ページネーションが正しく動作しているか、列の数やヘッダーが正しいかを確認しています。

def test_labs_aggrid() -> None:
    at = AppTest.from_file("app.py")
    at.run()
    aggrid_component = get_aggrid_components(at.main)[0]

    data = aggrid_component["row_data"]
    expected_headers = ["id", "name", "company", "address"]
    assert len(data) > 0
    assert len(data) == 1000
    assert list(data[0].keys()) == expected_headers

    grid_options = aggrid_component["gridOptions"]
    assert grid_options["paginationPageSize"] == 20
    assert len(grid_options["columnDefs"]) == 4
    assert [col["headerName"] for col in grid_options["columnDefs"]] == expected_headers
    assert aggrid_component["frame_dtypes"]["id"] == "i"
    assert aggrid_component["frame_dtypes"]["name"] == "O"
    assert aggrid_component["frame_dtypes"]["company"] == "O"
    assert aggrid_component["frame_dtypes"]["address"] == "O"

以上がApp Testingを使ったテストの紹介です。

複数アプリケーションを一括でリリースするCD

背景

LabsはEKSのアプリケーション基盤で動いています。現在のデプロイフローは次の通りです。

  1. 各アプリケーションはGitHub ActionsでECRにDockerイメージをプッシュ
  2. Argo CD Image UpdaterがECRへのプッシュを検知し、アプリケーション基盤のリポジトリへイメージタグ更新のPRを出す
  3. PRがマージされると、Argo CDが更新されたイメージをデプロイ

このように自動化されているため、ひとつのアプリケーションのリリースでは問題ありません。 しかし、複数アプリケーションを一括でリリースしようとすると、上記手順を実行し、それぞれに作られるPRを全てマージする必要があります。

そのため、複数アプリケーションを一括でビルドし、別リポジトリにリリースPR1つだけ出すCDを開発しました。

期待される成果

複数アプリケーションを一括でリリースするCDが完成し、リリースが簡単に行える状態が期待される成果です。

取り組み内容

Github Actionsのworkflow_dispatchを使って、入力したアプリケーションを一括でビルドし、アプリケーション基盤のリポジトリへPRを出すworkflowを作りました。

ざっくりとしたフローは次の通りです。

  1. ユーザーがリリース対象のアプリケーション名を入力し、workflow_dispatchを実行
  2. 入力されたアプリケーションを並列でビルドし、ECRにプッシュ
  3. kustomization.yamlのイメージタグを更新&コミット
  4. アプリケーション基盤のリポジトリへリリースPRを出す
name: Multiple ECR Image Update

on:
  workflow_dispatch:
    inputs:
      apps:
        type: string
        description: "Application name separated by commas(ex. app-1,app-2)"
        required: true
        default: "app-1,app-2,app-3,app-4,app-5"
      app_type:
        type: choice
        description: "Application type"
        required: true
        options:
          - app
          - batch
          - api
      environment:
        type: choice
        description: "Environment"
        required: true
        options:
          - staging
          - production

jobs:
  set-matrix:
    runs-on: ubuntu-latest
    outputs:
      apps: ${{ steps.setmatrix.outputs.apps }}
      image_tag: ${{ steps.generate_image_tag.outputs.image_tag }}
    timeout-minutes: 5
    steps:
      - name: Set matrix
        id: setmatrix
        run: |
          # カンマで文字列を分割して配列に格納する
          inputs=${{ inputs.apps }}
          array=(${inputs//,/ })

          quoted_apps=()
          for element in "${array[@]}"; do
              # 要素をダブルクォーテーションで囲んで配列に追加
              quoted_apps+=("$element")
          done

          apps=$(printf '%s\n' "${quoted_apps[@]}" | jq -R . | jq -s -c .)
          echo "apps=$apps" >> $GITHUB_OUTPUT

      - name: Generate Image tag
        id: generate_image_tag
        run: |
          # Image Updaterが反応しないようにyyyy-mm-dd-hhmmss形式にする
          image_tag=$(date +'%Y-%m-%d-%H%M%S')
          echo "image_tag=$image_tag" >> $GITHUB_OUTPUT

  update-ecr-image-and-commit:
    needs: set-matrix
    name: "[${{ inputs.environment }}][${{ matrix.app }}][${{ inputs.app_type }}]Multiple Update ECR Image"
    runs-on: ubuntu-latest
    # These permissions are needed to interact with GitHub's OIDC Token endpoint.
    permissions:
      id-token: write
      contents: read
    timeout-minutes: 30
    strategy:
      fail-fast: false
      matrix:
        app: ${{ fromJson(needs.set-matrix.outputs.apps) }}
    environment: ${{ inputs.environment }}
    steps:
      - name: Checkout
        uses: actions/checkout@v4

      - name: Sets common env variables
        run: |
          APP_NAME=${{ matrix.app }}
          echo "UNDER_SCORE_APP_NAME=${APP_NAME//-/_}" >> $GITHUB_ENV

      - name: Check deploy target existence
        id: check_files
        uses: andstor/file-existence-action@076e0072799f4942c8bc574a82233e1e4d13e9d6 # v3.0.0
        with:
          files: apps/${{ env.UNDER_SCORE_APP_NAME }}/${{ inputs.app_type }}

      - name: Generate github token
        if: steps.check_files.outputs.files_exists == 'true'
        id: generate_token
        uses: tibdex/github-app-token@3beb63f4bd073e61482598c45c71c1019b59b73a # v2.1.0
        with:
          app_id: ${{ secrets.APP_ID }}
          private_key: ${{ secrets.RANDD_ARTIFACT_GITHUB_APP_PRIVATE_KEY }}

      - name: Configure AWS Credentials
        if: steps.check_files.outputs.files_exists == 'true'
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_ASSUME_ROLE_ARN }}
          aws-region: ap-northeast-1

      - name: Login to Amazon ECR Private
        if: steps.check_files.outputs.files_exists == 'true'
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v2

      - name: Build, tag, and push docker image to Amazon ECR
        if: steps.check_files.outputs.files_exists == 'true'
        id: build_push_image
        env:
          REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          REPOSITORY: randd/labs/${{ matrix.app }}/${{ inputs.app_type }}
          github_access_token: ${{ steps.generate_token.outputs.token }}
        run: |
          DOCKER_BUILDKIT=1 docker build --secret id=github_access_token -t $REGISTRY/$REPOSITORY:${{ needs.set-matrix.outputs.image_tag }} .
          docker push $REGISTRY/$REPOSITORY:${{ needs.set-matrix.outputs.image_tag }}
        working-directory: apps/${{ env.UNDER_SCORE_APP_NAME }}/${{ inputs.app_type }}

  create-pull-request:
    needs: [set-matrix, update-ecr-image-and-commit]
    name: "[${{ inputs.environment }}][${{ inputs.app_type }}]Create Pull Request to randd_circuit"
    runs-on: ubuntu-latest
    # These permissions are needed to interact with GitHub's OIDC Token endpoint.
    permissions:
      id-token: write
      contents: read
    timeout-minutes: 10
    environment: ${{ inputs.environment }}
    steps:
      - name: Generate github token for circuit
        id: generate_token_circuit
        uses: tibdex/github-app-token@3beb63f4bd073e61482598c45c71c1019b59b73a # v2.1.0
        with:
          app_id: ${{ vars.RANDD_CIRCUIT_APP_ID }}
          private_key: ${{ secrets.RANDD_CIRCUIT_PRIVATE_KEY }}

      - name: Checkout source with token and branch
        uses: actions/checkout@v4
        with:
          repository: ${{ vars.CIRCUIT_REPO }}
          token: ${{ steps.generate_token_circuit.outputs.token }}
          sparse-checkout: "services/labs"

      - name: Update kustomization.yaml
        run: |
          git config --local user.email "github-actions[bot]@users.noreply.github.com"
          git config --local user.name "github-actions[bot]"
          branch_name=labs-${{ inputs.app_type }}-update-image-tag-${{ github.sha }}
          git switch -c $branch_name

          for app in $(echo '${{ needs.set-matrix.outputs.apps }}' | jq -r '.[]'); do
              yaml_file="services/labs/$app/${{ inputs.app_type }}/overlays/aws-randd-${{ inputs.environment }}/kustomization.yaml"
              if [ -e $yaml_file ]; then
                  sed -i "s/\(newTag: \)[^\n]*/\1\"${{ needs.set-matrix.outputs.image_tag }}\"/" "$yaml_file"
              fi
          done

          git add .
          git commit -m "${{ inputs.app_type }}: multiple update image tag"
          git push -u origin HEAD:"$branch_name"

      - name: create Pull Request to circuit
        id: create_pr
        run: |
          gh pr create \
            --title "[${{ inputs.environment }}][labs][${{ inputs.app_type }}]Multiple image tag updates" \
            --body "Multiple image tag updates." \
            --repo ${{ vars.CIRCUIT_REPO }} \
            --base main \
            --head "labs-${{ inputs.app_type }}-update-image-tag-${{ github.sha }}"
        env:
          GH_TOKEN: ${{ steps.generate_token_circuit.outputs.token }}

      - name: Check image tag
        if: steps.create_pr.outcome == 'success'
        run: |
          echo "[${{ inputs.environment }}] [${{ inputs.app_type }}] image tag: \`${{ needs.set-matrix.outputs.image_tag }}\`" >> $GITHUB_STEP_SUMMARY

ポイントは次の通りです。

  • set-matrixジョブ
    • 入力したアプリケーションを並列でビルドするためのmatrixを作成
  • pdate-ecr-image-and-commitジョブ
    • matrixで作成したアプリケーションを並列でビルド
    • Image Updaterはコミットハッシュで発火するため、それを回避するためにyyyy-mm-dd-hhmmss形式を使用
    • アプリケーションによってはapp, batch, apiが存在しないため、andstor/file-existence-actionでチェックを行い、存在する場合のみビルドを行う
  • create-pull-requestジョブ
    • すべてのアプリケーションのビルドが成功したときだけ動くよう、needsで依存関係を設定
    • sparse-checkoutで必要なディレクトリだけを取得
    • kustomization.yamlを更新し、アプリケーション基盤のリポジトリに対してPRを出す

以上で、複数アプリケーションを一括でリリースするCDが完成しました。

Labsリソースにおける依存関係の可視化

背景

Labsのサービスが外部に依存しているライブラリやデータソースなどの情報はあらゆるファイルに点在しており、整理することが困難となっています。

現状では、外部に何らかのアップデートや障害があった際、修正箇所や影響範囲の特定に時間がかかってしまいます。 そのため、ライブラリやデータソースなど外部依存を可視化する必要があります。

期待される成果

Labsサービスの依存関係をGithub Actionsにて抽出し、Github wikiに表として可視化することが最終成果です。 可視化するリソースとその目的は次の通りです。

  • 使用しているライブラリとそのバージョン一覧
    • サービスごとのバージョンにばらつきがあるか、古すぎるバージョンを使用しているかの確認
  • サービスごとの環境変数
    • 連携先のパスが変更されたときの対応
    • ColossusではなくS3を参照している箇所の特定
    • URLベースからURIベースに変更
  • 各バッチが依存しているテーブル(Athena, Colossus)
    • 障害が起きた時の影響範囲の特定
    • データソースが提供終了した際の修正箇所の特定

これらを次のような手順を踏むActionsを作成し、実現します。

  1. 対象のリソースを抽出
  2. 次項で説明するPython Scriptにて該当箇所を抽出
  3. Wikiに書き込み

取り組み内容

Actions内でPythonを実行する際の工夫

Actionsの開発をする際に頭を悩ませるのはshellです。

sh, bashの差分やローカルとの違い、さまざまな面で思わぬ時間ロスになります。

今回の開発は比較的ロジックも軽く作り直しが容易であるため、Actions開発をPythonで行うことのテストを兼ねて開発環境をRye+UV+Ruff+mypy+pytestで構築しました。実行環境ではrequirements.lockを使用するという構成を試しています。

今のところ、便利な点としてはshellで書いた際よりもロジックが分かりやすいため、レビューがスムーズに進むと感じました。ただ長期的に見た際のメンテナンス性など、まだまだ見えていないリスクがあるため、じっくり見極めていきたいです。

ライブラリとそのバージョンの取得

使用しているライブラリはすべてpyproject.tomlに格納されています。

抽出する情報は次の通りです。

  • サービス名
  • ライブラリ名
  • ライブラリのバージョン

pyproject.tomlファイルへのパスを入力として各項目を取得するpythonコードは以下になります。

from tomllib import loads

results = []
for path in paths:
    with open(path, "r") as f:
        pyproject = loads(f.read())
        results += extract_library_versions(pyproject)

def extract_library_versions(pyproject: dict[str, Any]) -> list[list[str]]:
    results: list = []
    if "poetry" in pyproject["tool"]:
        name: str = pyproject["tool"]["poetry"]["name"]
        if "tool" in pyproject and "poetry" in pyproject["tool"]:
            # extract dependencies
            if "dependencies" in pyproject["tool"]["poetry"]:
                for key in pyproject["tool"]["poetry"]["dependencies"]:
                    results.append(
                        [
                            name,
                            key,
                            extract_version(pyproject["tool"]["poetry"]["dependencies"][key]),
                        ]
                    )

            # extract dev-dependencies
            if "dev-dependencies" in pyproject["tool"]["poetry"]:
                for key in pyproject["tool"]["poetry"]["dev-dependencies"]:
                    results.append(
                        [
                            name,
                            key,
                            extract_version(pyproject["tool"]["poetry"]["dev-dependencies"][key]),
                        ]
                    )

    return results

def extract_version(dependency: str | dict[str, Any]) -> str:
    if isinstance(dependency, str):
        return dependency
    elif isinstance(dependency, dict) and "tag" in dependency:
        return dependency["tag"]
    else:
        return ""

このコードでは、前段でfindにて抽出されたpyproject.tomlファイルを読み込み、dependenciesとdev-dependenciesに書かれたライブラリのversionsを抽出しています。

また、このコードはpoetryに向けて書かれたものなので、今後ryeによって書かれたものが出現し始めた際にはコードの追加が必要になります。

環境変数の取得

各サービスの環境変数はアプリケーション基盤のマニフェストファイル(yaml)に記述されています。

主に抽出する情報は次の通りです。

  • サービス名
  • サービスのkind(CronJob, Deploymentなど)
  • 環境変数の名前
  • 環境変数の値

環境変数の値の中にはExternal SecretsやConfig Mapの参照などがあり、それらのkey, valueも取得します。

以下がyamlファイルのパスから各項目を出力するpythonコードです。

from yaml import safe_load_all

results = []
with open(path, "r") as f:
    manifests = safe_load_all(f.read())
    for manifest in manifests:
        results += extract_env_values(manifest, "production")
        
def extract_env_values(manifest: dict[str, Any], environment: str) -> list[list[str]]:
    results = []
    if manifest["kind"] == "Deployment":
        for container in manifest["spec"]["template"]["spec"]["containers"]:
            for env in container["env"]:
                extract_values = extract_values_from_env(env)
                if len(extract_values) == 5:
                    results.append(
                        [
                            manifest["metadata"]["name"],
                            manifest["kind"],
                        ]
                        + extract_values
                    )
    elif manifest["kind"] == "CronJob":
        for container in manifest["spec"]["jobTemplate"]["spec"]["template"]["spec"]["containers"]:
            for env in container["env"]:
                extract_values = extract_values_from_env(env)
                if len(extract_values) == 5:
                    results.append(
                        [
                            manifest["metadata"]["name"],
                            manifest["kind"],
                        ]
                        + extract_values
                    )
    elif manifest["kind"] == "CronWorkflow":
        for template in manifest["spec"]["workflowSpec"]["templates"]:
            if "container" in template:
                if "env" in template["container"]:
                    for env in template["container"]["env"]:
                        extract_values = extract_values_from_env(env)
                        if len(extract_values) == 5:
                            results.append(
                                [
                                    manifest["metadata"]["name"],
                                    manifest["kind"],
                                ]
                                + extract_values
                            )
    return results


def extract_values_from_env(env: dict[str, Any]) -> list[str]:
    if "valueFrom" in env:
        if "configMapKeyRef" in env["valueFrom"]:
            return [
                env["name"],
                "",
                "configMapKeyRef",
                env["valueFrom"]["configMapKeyRef"]["key"],
                env["valueFrom"]["configMapKeyRef"]["name"],
            ]
        elif "secretKeyRef" in env["valueFrom"]:
            return [
                env["name"],
                "",
                "secretKeyRef",
                env["valueFrom"]["secretKeyRef"]["key"],
                env["valueFrom"]["secretKeyRef"]["name"],
            ]
    elif "value" in env:
        return [
            env["name"],
            env["value"],
            "value",
            "",
            "",
        ]
    return []

このコードではActionsの前段でfindにて抽出された*.yamlのうち、Deployment, CronJob, CronWorkflowのkindのものを読み込み、そのEnvの内容を抽出しています。

バッチのデータソースとなるテーブル一覧の取得

Labsにおいては、データソースへのクエリはすべてsqlファイルにまとめられています。

抽出する情報は次の通りです。

  • サービス名
  • GCPプロジェクト名(Colossusのみ)
  • データベース名
  • テーブル名

sqlファイルからテーブル名を抽出する際はpythonのsql_metadataというライブラリを使いました。

sqlファイルのパスからテーブルの情報を取得する関数は以下になります。

import sql_metadata

def parse_sql_files_to_table_list(
    sql_file: Path,
    service_name: str,
    athena_table_data: List[AthenaTableInfoTuple],
    colossus_table_data: List[ColossusTableInfoTuple],
) -> None:
    with sql_file.open("r") as sf:
        sql_contents = sf.read()
    for sql in sql_contents.split(";")[:-1]:
        sql = sql.replace('"', "")
        try:
            tables: List[str] = sql_metadata.Parser(sql).tables
        except ValueError:
            # create *** function 構文はパースできないが、テーブル情報は含まないため無視する
            if "create temp function" in sql.lower() or "create temporary function" in sql.lower():
                continue
            else:
                raise ValueError(f"file_path: {str(sql_file)}\nFailed to parse the SQL: {sql}")
        for table in tables:
            table_info = table.split(".")
            if len(table_info) == 2:  # Athenaは{database_name}.{table_name}の形式
                athena_table_data.append(AthenaTableInfoTuple(service_name, table_info[0], table_info[1]))
            elif len(table_info) == 3:  # Colossusは{gcp_project}.{database_name}.{table_name}の形式
                colossus_table_data.append(
                    ColossusTableInfoTuple(
                        service_name,
                        table_info[0],
                        table_info[1],
                        table_info[2],
                    )
                )

この関数の呼び出し側でsqlファイルパスを取得する際に、sqlファイルの上位にあるサービスディレクトリの名前(サービス名)も取得しています。 sqlファイルを読み込み、sql_metadataにより各クエリをパースしています。

sql_metadataでは、FROM句やINSERT JOIN句などのテーブルを参照する箇所をtablesとして一覧化することできます。

おわりに

以上が合宿で取り組んだ内容です。合宿後は、これらの成果を元にLabsの開発がより効率的に進むことを期待しています。リリース頻度やリードタイムに改善が見られるどうかは引き続きモニタリングします。

Architectグループでは一緒に働く仲間を募集しています。

R&D MLOps/DevOpsエンジニア



20240312182329

© Sansan, Inc.