こんにちは、R&D Architectグループの藤岡です。今回は Step Functions(以下SFn) が実行途中で失敗した場合に、失敗した時点から再開できるような仕組みを実現したので、その内容について共有します。
<2024/2/8追記>
AWS公式で Step Functions の失敗した時点から再開する機能がサポートされたので、この記事の内容は古くなりました。
https://aws.amazon.com/jp/about-aws/whats-new/2023/11/aws-step-functions-restarting-workflows-failure/
背景と課題
今回、R&D内で稼働しているバッチの1つに関してシステムのリプレイスを行うことになりました。リプレイスを行う理由は主に以下の2つです。
- バッチが途中で失敗した際の復旧作業が複雑になっていた。
- バッチで処理するデータ量が増え、当初よりも実行時間が長くなっていた。
本記事では、復旧対応箇所に焦点を当てて取り組んだ内容を紹介します。
具体的な現状の構成としては、複数の AWS Batch と、それを制御する Lambda で構成されており、EventBridge で Lambda が定期実行されるような仕組みになっています。図示すると以下の通りです。
これのどこに問題があるかというと、AWS Batch が失敗した場合に、Lambda のコードを書き換えて再実行しなければならないという点でした。また、各 AWS Batch で実行されるシェルスクリプトから複数のPythonスクリプトが呼ばれているので、完了したスクリプトの呼び出しをコメントアウトしてイメージを再pushする必要がありました。いわゆる運用でカバー状態となっており、負担が大きかったので、システムを丸ごと置き換えることになりました。
ではどのような構成にしたかというと、全体を制御する Lambda と AWS Batch を、SFn に置き換えました。また、各Pythonスクリプトをそれぞれ一つのステップとして、ECSタスクとして実行するようにしています。図示すると以下の通りです。
しかしここで問題があり、SFn は失敗したステップから再開する機能がサポートされておらず、ひと工夫しないといけなかったので、その内容を共有したいと思います。
途中再開する方法
さてここからが本題です。結論からいうと、AWS公式ブログで紹介されている方法を採用しました。具体的には以下のようなフローで途中再開を実現しています。
1. 失敗した SFn の実行結果を解析する。
2. 成功したステップはスキップして、失敗したステップまで遷移できるような、新たなステートマシンを作成する。
どのように途中再開を実現するか、具体的な例を示して説明します。
例えば、以下のようなワークフローのステートマシンがあったとします。
ASL(Amazon States Language:ワークフローをJSON形式で定義したもの)は以下の通りです。
{ "Comment": "A description of my state machine", "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map1", "Result": [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ], "ResultPath": "$.index" }, "Map1": { "Type": "Map", "Iterator": { "StartAt": "Step1", "States": { "Step1": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "Payload.$": "$", "FunctionName": "test_lambda1" }, "ResultPath": null, "End": true } } }, "MaxConcurrency": 1, "Next": "Map2", "ResultPath": null, "Parameters": { "index.$": "$$.Map.Item.Value" }, "ItemsPath": "$.index" }, "Map2": { "Type": "Map", "End": true, "Iterator": { "StartAt": "Step2", "States": { "Step2": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "test_lambda2" }, "ResultPath": null, "End": true } } }, "MaxConcurrency": 1, "ItemsPath": "$.index", "Parameters": { "index.$": "$$.Map.Item.Value" }, "ResultPath": null } } }
このSFnを実行した時の流れは以下のようになります。
1. Passステートで [1, 2, ..., 9] のリストが作られる。
2. Map1ステートに 1. のリストが渡され、Step1 の Lambda がイテレートされる。
3. Map2ステートに 1. のリストが渡され、Step2 の Lambda がイテレートされる。
(Mapステートについての詳細は ドキュメント を参照していただきたいですが、いわゆるループ処理を SFn 上で再現したものです。)
この SFn を実行した際に、以下のように 3. の4番目のインデックスで失敗した場合を考えます。
Step1が時間のかかる処理でなければ再実行するのが手っ取り早いですが、実行に数日かかる場合は再実行したくありません。(今回の例は Lambda なのでそこまで時間がかかることはありませんが、ECSタスクなどで重いバッチ処理を実行することを想定しています。)
ここで登場するのが前述したブログで紹介されているスクリプトです。
python gotostate.py --failedExecutionArn '<EXECUTION_ARN_OF_FAILED_STATE_MACHINE>'
事前に失敗したステートマシンの実行ARNを調べておき、上記コマンドを実行することで、本来であれば以下のような新たなステートマシンが作られます。
注目すべき点としては、GoToState という条件分岐の Choiceステートが一番最初に追加されており、 $.resuming の値によって失敗したステップまで遷移できるようになっています。
しかしここで問題があり、記事が書かれたのが2017年11月とやや古く、2021年3月に実装されたMapステートに対応していませんでした。その他にもいくつか不便な点があったので、ブログ内で公開されていたスクリプトを以下のように修正しました。
修正箇所は以下の通りです。
- Mapステートに対応
while current_event_id != 0: # multiply event id by -1 for indexing because we're looking at the reversed history current_event = failed_events[-1 * current_event_id] """ We can determine if the failed state was a parallel or map state because it an event with 'type'='ParallelStateFailed' or 'MapStateFailed' will appear in the execution history before the name of the failed state """ if current_event["type"] in ["ParallelStateFailed", "MapStateFailed"]: failed_at_parallel_or_map_state = True if current_event["type"] == "TaskStateEntered": succeed_index = json.loads(current_event["stateEnteredEventDetails"]["input"])["index"] succeed_index_list.append(succeed_index) """ If the failed state is not a parallel or map state, then the name of failed state to return will be the name of the state in the first 'TaskStateEntered' event type we run into when tracing back the execution history """ if current_event["type"] == "TaskStateEntered" and not failed_at_parallel_or_map_state: failed_state = current_event["stateEnteredEventDetails"]["name"] failed_input = json.loads(current_event["stateEnteredEventDetails"]["input"]) return failed_state, failed_input """ If the failed state was a parallel or map state, then we need to trace execution back to the first event with 'type'='ParallelStateEntered' or 'MapStateEntered', and return the name of the state """ if current_event["type"] in ["ParallelStateEntered", "MapStateEntered"] and failed_at_parallel_or_map_state: failed_state = current_event["stateEnteredEventDetails"]["name"] failed_input = json.loads(current_event["stateEnteredEventDetails"]["input"]) # Mapで順列処理の場合 if current_event["type"] == "MapStateEntered": # 最初に追加された番号は失敗しているので除く。 succeed_index_list.pop(0) for succeed_index in succeed_index_list: # 既に成功している番号を再実行しないために、次回実行時のinputから除外する。 failed_input["index"].remove(succeed_index) return failed_state, failed_input # Update the id for the next execution of the loop current_event_id = current_event["previousEventId"]
- SFn を中断した場合に対応
# 実行を停止した場合、中断したステップから再開する if failed_events[0]["type"] == "ExecutionAborted": for current_event in failed_events: if current_event["type"] == "TaskStateEntered": failed_state = current_event["stateEnteredEventDetails"]["name"] failed_input = json.loads(current_event["stateEnteredEventDetails"]["input"]) return failed_state, failed_input
- 新たに生成されたステートマシンがさらに失敗した場合にも対応
if original_start_at == "GoToState": # 2回目以降の場合は遷移先を差し替える state_machine["States"]["GoToState"]["Default"] = failed_state_name
- 新たに生成されるステートマシンの命名規則変更
def create_new_sm_name(original_name: str) -> str: *base, suffix = original_name.split("-") if suffix.isdecimal(): base_name = "-".join(base) return f"{base_name}-{int(suffix) + 1}" return f"{original_name}-1"
- デフォルトで途中再開するように変更
if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: python gotostate.py --failedExecutionArn '<Failed_Execution_Arn>'") sys.exit(1) parser = argparse.ArgumentParser(description="Execution Arn of the failed state machine.") parser.add_argument("--failedExecutionArn", dest="failedExecutionArn", type=str, required=True) args = parser.parse_args() failed_sm_state, failed_sm_info = parse_failure_history(args.failedExecutionArn) failed_sm_info["resuming"] = True failed_sm_arn = sm_arn_from_execution_arn(args.failedExecutionArn) new_machine = attach_go_to_state(failed_sm_state, failed_sm_arn) print(f"New State Machine Arn: {new_machine['stateMachineArn']}") print(f"新しく生成されたステートマシンの実行開始時に以下を入力してください。: \n{json.dumps(failed_sm_info)}")
参考
github.com
これでMapステートにも対応できるようになったので、先ほどのコマンドが成功するようになります。
ちなみに、先のブログにMapステートに対応する修正が必要とあったので、
Update March, 5 2021 – Disclaimer: This blog precedes the introduction of map state to the Amazon States Language and requires modifications to work with the map state.
OSSへの感謝の意を込めて修正PRを投げておきました。
github.com
おわりに
以上でバッチが失敗した場合に、容易に途中再開を実現することが可能となりました。
また、Mapステートには最大同時実行数を設定することができ、2以上を指定することで並列処理も可能なため、トータルのバッチ実行時間の大幅な短縮にもつながりました。
SFnの途中再開に関する情報があまり見つからなかったので、この記事がお役に立てれば幸いです。