技術本部 Strategic Products Engineering Unit Order One Devグループで受注業務のDXから、事業を加速するプロダクトOrder Oneの開発をしている山邊です。
本題に入る前にお知らせです。2/27 (火) に「自由な発想でつながる、失敗談を語るLTパーティー」というイベントを開催します。 ぜひ、お気軽にご参加ください! sansan.connpass.com
Order Oneにドメインイベント・非同期イベントの仕組みを導入したので、仕組みの紹介をしたいと思います。ドメインイベントの導入を検討している方の役に立てば幸いです。
ドメインイベントとは
GPTによると
ドメインイベントは、ソフトウェアの専門分野(ドメイン)において重要な出来事が起こった際に使用される概念です。例えば、オンラインショップで顧客が注文すると、「注文が完了した」というドメインイベントが生成されます。この情報を使って、在庫管理システムを更新しれたり、顧客に確認メールを送ったりできます。DDD(ドメイン駆動設計)では、このようなイベントを通じてソフトウェアのビジネスロジックが正確に表現され、システムの各部分が協調して機能します。これにより、ソフトウェアがより管理しやすく、拡張しやすいものになります。
技術スタック
Order Oneでは主にGoogle Cloud Platform(GCP)上でサーバーサイドKotlinを採用しています。今回のドメインイベントについても説明に使用するコードは全てKotlinで書かれています。また、キューイングサービスは Google Cloud Tasksを使用しています。
Order Oneでドメインイベントの導入を決めた背景
Order Oneでは、既にDDDを採用しており、非同期イベントの仕組みを利用していましたが、これにはいくつかの課題がありました。以前のシステムでは、次のような方法で非同期イベントを実行していました。
導入前の非同期イベントの実行方法
例として、ユーザー作成後に2つの後続処理を実行したい場合のコードが次の通りです。
// トランザクションを開く runInTransaction { // ユーザーを作成 val user = User.new() // 作成したユーザーを永続化 UserRepository.register(user) // ユーザーの作成通知を送信するタスクをキューに積む GoogleCloudTasksClient.createTask( queueName = "exampleQueue", targetUrl = "https://example.com/api/send-created-user-mail", requestBody = "{'id': '${user.id}'}", scheduledTime = 1 ) // ユーザーの追加情報を生成するタスクをキューに積む GoogleCloudTasksClient.createTask( queueName = "exampleQueue", targetUrl = "https://example.com/api/generate-user-additional-info", requestBody = "{'id': ${user.id}'}", scheduledTime = 1 ) }
課題点
- 非同期処理のロジックがアプリケーション層に記述されていました。そのためドメインイベントがどのタイミングで発火されるか、ドメイン同士のつながりがどうなっているかがわかりにくい状態でした。それぞれのドメイン操作やAPIがどこから呼ばれているかを判断するためにはAPIのパスでアプリケーション層を検索する必要があり、これによって仕様調査やエラー調査が難しくなっていました。
- トランザクションの処理中にタスクを登録していたため、コミット前に後続の処理が実行されるリスクがありました。多くのドメインイベントは発行元のドメインオブジェクトを参照しますが、永続化される前にこれを参照すると、ドメインオブジェクトが見つからないエラーが発生する可能性がありました。
仕組み
今回は、Order Oneで導入したドメインイベントの仕組みを3つの段階に分けて説明します。
ドメインイベント
アプリケーション層の実装は次のようになります。
User.new()
がentityを直接返さず、ドメインイベントを返すようになります。- 永続化処理は変わっていません。
- タスクをキューに積むコードの代わりにドメインイベントをパブリッシュする処理になります。
// トランザクションの開始 runInTransaction { // ユーザーの作成 val userCreatedEvent = User.new() // 作成したユーザーの永続化 UserRepository.register(userCreatedEvent.entity) // ドメインイベントのパブリケーション domainEventPublisher.publish(userCreatedEvent) }
ドメインイベントはドメインに操作を加えたときに作成されます。ドメインイベントの型は次のようなイメージで、ドメイン層からはドメインイベントを継承した型を返します。ドメインイベントに後続の処理がない場合は省略しても良いかもしれないですが、Order Oneでは原則としてドメインイベントを返す方針にしています。
ドメインイベントは asyncCommand
= 非同期で実行される処理
を持つことができます。 asyncCommand
はドメインイベントと強いつながりがありますが、分けて考えることもできます。というのもドメインイベントに属さないasyncCommandも作成できる仕組みになっています。
// ドメインイベントのinterface interface DomainEvent<Payload : DomainEvent.Payload> { data class Id(val value: UUID) interface Payload { fun asyncCommands(): List<AsyncCommand> } val id: Id val payload: Payload } // ユーザーが作成されたときのドメインイベントの例 data class UserCreatedEvent( val entity: User ) : DomainEvent(UserCreatedEvent.Payload) { data class Payload(val userId: entity.Id) : DomainEvent.Payload { override fun asyncCommands(): List<AsyncCommand> = listOf( // TODO ) } override val id = DomainEvent.Id.new() override val payload = Payload(userId = entity.id) } // Userドメインの例 data class User( val Id: UUID ){ companion object { fun new() = UserCreatedEvent(User(Id = UUID.randomUUID())) } }
そして、DomainEventPublisherの説明です。
DBに調査用のデータを永続化しています。調査時にどのコマンドがどのドメインイベントによってInvokeされたかを容易に判別するためなので、なくても支障はありません。
DomainEventPublisherは AsyncCommandInvoker.invoke()
を呼び出します。
object DomainEventPublisher { fun publish( domainEvent: DomainEvent<*> ) { // DB に調査用のデータを永続化 DomainEventRepository.create(domainEvent) // ドメインイベントに紐づくコマンドをCommandInvokerに渡す AsyncCommandInvoker.invoke( commands = domainEvent.getAsyncCommands() ) } }
コマンド
ここからはコマンドについて説明します。ドメインイベントのパブリッシャーは最終的にコマンドのインボーカーを呼び出しています。もちろんドメインイベントに属さないコマンドも作成でき、その場合はAsyncCommandInvoker.Invoke()
を直接呼び出すことになります。
AsyncCommandInvoke.Invoke()では command_awaiting_deploy
というテーブルにコマンドの内容を永続化し、後で復元できるようにしています。ここで永続化する内容はAsyncCommandにどんな値を持たせるかによって変わってきますが、代表的なものとしては id, payload, url, queue
などが含まれます。
Order OneではAPIの呼び出しごとにユニークなID(CallUUID)をCommandのキーに使用しています。これによって他のAPIで登録されたコマンドをデプロイしないようにしています。
object AsyncCommandInvoker { fun invoke( commands: List<AsyncCommand>, ){ CommandRepository.createAwaitingDeploy(commands) } }
この時点ではコマンドはデータベースに記録されているだけで、GoogleCloudTasksにタスクが積まれてはいません。トランザクションが終了した後に、処理を追加します。
トランザクション終了後にデプロイされていないコマンドがある場合はブローカーAPIを呼び出すタスクを積みます。
ここで直接コマンドをタスクに変換するという方法もありますが、GoogleCloudTasksを使用している場合、タスクを積む処理には少し時間がかかることに注意が必要です。特に多くのタスクをデプロイする必要がある場合はユーザーにレスポンスを返すのが遅くなるため避けたほうが良いでしょう。
suspend inline fun <T> runInTransaction( crossinline block: (handle: Handle) -> T ): T { val result: T = block(/* 略 */) withHandle { // デプロイされていないコマンドを調べる val commands = CommandRepositoryImpl.getAwaitingDeploy() if (commands.isNotEmpty()) { // デプロイされていないコマンドがあればブローカーAPIを呼び出すタスクを1つ作成する AsyncCommandBroker.createTask() } } return result }
ブローカー
ブローカーAPIではデプロイされていないコマンドを順番にデプロイします。冪等性を考慮して、1つのコマンドごとにトランザクションを設定しています。
そして、デプロイが完了したコマンドは command_awaiting_deploy
から削除し、 command_deployed
にログとして記録しておきます。
val commands = withHandle() { -> CommandRepository.getAwaitingDeploy() } commands.forEach { command -> // 冪等性を担保するために1つずつコミットする runInTransaction() { -> createTask(command) CommandRepository.markAsDeploy(command) logger.info("タスクに作成しました。commandId: ${command.id}") } }
導入後
- 課題の1つ目にあったドメイン同士のつながりはドメイン層に書けようになりました。これによってドメイン同士のつながりの調査が容易になり、ドメインがどこから操作されるか分かりやすくなりました。
- トランザクションが終了してからタスクをキューに積むように修正した事でエンティティが見つからずエラーになる事がなくなりました。
最後に
説明のために一部のコードを省略・改変していますが、全体的にはOrder Oneで稼働しているコードと同じです。参考になれば幸いです。