Sansan Builders Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

【Techの道も一歩から】第37回「データの集計に Luigi を使ってみる」

f:id:kanjirz50:20190104142720j:plain

こんにちは。 DSOC R&D グループの高橋寛治です。

あるプロジェクトにおいて、 「DB からデータを取得し整形した結果を出力する」という一連の処理をパイプラインパッケージである Luigi を用いて行いました。

ワークフローの監視や記述が簡単に記述できて良かった*1と感じたため、ここで流れを紹介したいと思います。

Luigi

Luigi は Spotify が開発・運用しているオープンソースの Python のワークフローパッケージです。

pip install luigi で簡単に導入できます。

Luigi は単独で動作させたり、スケジューラによりスケジューリングや依存関係の可視化、タスク履歴を閲覧することができます。

ワークフローはタスクから構成されます。 ざっくり説明すると、一つのタスクは一つのクラスで表現されます。 クラスは Luigi が準備しているクラスを継承し、必要な実装を行います。 依存するタスクや出力先、処理など、タスクを構成する要素を実装します。

まとめると、Luigi が提供するお作法に従うことで、ワークフロー自体をどう実装するかはあまり考えず、どのデータを使うかや具体的な処理に集中できるということです。

下記が Luigi のお作法の例となります。

import luigi


class LuigiEventTask(luigi.Task):
    def output(self):
        pass

    def requires(self):
        pass

    def run(self):
        pass

ワークフローにしたいこと

今回は、「ある一日の記事をDBから取得し、分析を行った結果を保存する」という一連の流れを記述します。 次の工程に分解し、それぞれをタスクとします。

  • 記事を DB から取得(ArticleCollectionTask)
  • 記事カテゴリ分類(CategoryClassificationTask)
  • 記事言語判定(LanguageDetectionTask)
  • 結果の保存(SaveAnalysisResultTask)

ここで、記事分類と記事言語判定は、記事取得の結果を利用するものです(依存している)。 結果の保存は、記事単位でカテゴリや言語情報を保存するため、カテゴリ分類結果や言語判定結果の出力に依存しています。

以下は、Luigi のスケジューラが可視化した、依存関係です。

f:id:kanjirz50:20210308114001p:plain
Luigi による依存関係の可視化

上記のワークフローを実際に記述します。

ワークフローを記述する

記事を取得するタスクを記述します。 ファイル名は sample_workflow.py とします。

import datetime
import os

import luigi
from utils import collect_article


class ArticleCollectionTask(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
    destination_dir = "article_collection_task"

    def output(self):
        return luigi.local_target.LocalTarget(
            path=os.path.join(self.destination_dir, f"{ self.date.strftime('%Y%m%d') }.jsonl")
        )

    def run(self):
        df = collect_article(self.date)
        with self.output().open("wt") as fout:
            df.to_json(fout, orient="records", lines=True, force_ascii=False)

「ある一日」を実行時の引数として渡したいため、 DateParameter を定義します。

出力(destination_dir)は、このタスク名のディレクトリ配下に、日付.jsonl と JSON Lines 形式で出力します。

処理は、独自に定義した collect_article が date 型を引数にとり条件に合致するものを DataFrame として返します。 出力先は Luigi が用意する output メソッドにより提供されます。

この流れで後段のタスクも記述します。 後段のタスクでは前段タスクへの依存が出てきます。 例えば、CategoryClassificationTaskArticleCollectionTask に依存します。 タスク間の依存関係は、次のように requires デコレータで簡単に記述できます。

import json
import pandas as pd

from luigi.util import requires
from utils import classify_category


@requires(ArticleCollectionTask)
class CategoryClassificationTask(luigi.Task):
    destination_dir = "category_classification_task"

    def output(self):
        return luigi.local_target.LocalTarget(
            path=os.path.join(self.destination_dir, f"{ self.date.strftime('%Y%m%d') }.jsonl")
        )

    def run(self):
        with self.input().open("r") as fin:
            df = pd.read_json(fin, orient="records", lines=True)

        with self.output().open("wt") as fout:
            for _, row in df.iterrows():
                classified = classify_category(row)
                fout.write(json.dumps(classified , ensure_ascii=False) + "\n")

最後のタスクは二つの中間タスクの結果に依存します。 requires デコレータは二つのタスクを引数に取ることができます。

@requires(CategoryClassificationTask, LanguageDetectionTask)
class SaveAnalysisResultTask(luigi.Task):
    destination_dir = "save_analysis_result_task"

    def run(self):
        self.input() # [CategoryClassificationTask の出力先, LanguageDetectionTask の出力先]の順で格納されている
        # 処理を記述

self.input() の内容が複数の出力先を保持します。 あとは、具体的な処理を書いてワークフローの記述は完了です。

ワークフローを動かす

Luigi のスケジューラを次のコマンドで起動します。 5000番ポートで起動します。

luigid --pidfile ./tmp/pidfile --logdir ./log --port=5000

次にタスクを実行します。引数 dateluigi.DateParameter で指定した箇所に渡されます。

python sample_workflow.py SaveAnalysisResultTask --scheduler-port=5000 --date="2021-03-03"

最後に、次のように実行されたことが表示されます。

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 1 complete ones were encountered:
    - 1 ArticleCollectionTask(date=2021-03-03)
* 3 ran successfully:
    - 1 CategoryClassificationTask(date=2021-03-03)
    - 1 LanguageDetectionTask(date=2021-03-03)
    - 1 SaveAnalysisResultTask(date=2021-03-03)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

また、スケジューラ管理画面の history から履歴を見ることもできます。

f:id:kanjirz50:20210308120645p:plain
Luigiのhistory

このような流れでワークフローを記述し、スケジューラを稼働させ、ワークフローを処理・可視化することができます。

コードとして残す

Luigi を使ってワークフローを記述して感じたことは、コードとして残しつつスケジューラによるわかりやすい可視化が行われることが便利だということです。

コードとして残す部分では、入出力および処理についてコード内での見るべき箇所がすぐにわかります。 わかりやすい可視化は依存関係をグラフで描画してくれるため、説明や理解が簡潔になります。

Makefile やシェルスクリプトいろいろ書いていたのですが、Luigi のような今風なツールを積極的に使っていこうと思いました。

執筆者プロフィール

高橋寛治 Sansan株式会社 DSOC (Data Strategy & Operation Center) 研究開発部 研究員

阿南工業高等専門学校卒業後に、長岡技術科学大学に編入学。同大学大学院電気電子情報工学専攻修了。在学中は、自然言語処理の研究に取り組み、解析ツールの開発や機械翻訳に関連する研究を行う。大学院を卒業後、2017年にSansan株式会社に入社。キーワード抽出など自然言語処理を生かした研究開発に取り組む。

▼本連載のほかの記事はこちら

buildersbox.corp-sansan.com

*1:Makefile やシェルスクリプトをよく書いてしまいますが、他者への布教だと Luigi はいいように感じました。

© Sansan, Inc.