Pekko CQRS/Event Sourcing 完全開発ガイド

どうも 技術顧問の加藤です。

プレセナ社内の、とある案件で使っている、CQRS/Event Sourcingについてブログ記事を書いてみようと思います。

ソースコードの詳細はこちらを参照してください。

なぜCQRS/Event Sourcingなのか

従来のCRUD中心の設計の最大の問題は、Domain Driven Design(DDD)を適切に実践できないことです。CRUD中心のアーキテクチャでは、UIやアプリケーション層が「作成」「更新」「削除」といったデータ操作の語彙に支配され、ビジネスドメインの豊かな振る舞いやユーザーの意図が見えなくなってしまいます。

例えば、ECサイトで「商品を注文する」というビジネス行為を考えてみましょう。CRUD中心の設計では、UIに「注文の作成」「注文の更新」といった画面が並び、ユーザーの意図(「注文を確定する」「配送先を変更する」「注文をキャンセルする」)が失われます。実際のビジネスでは「在庫を確保する」「決済を開始する」「配送を手配する」など、多様な振る舞いが存在するにもかかわらず、CRUD中心の設計では、これらの重要なビジネスロジックがUI層やアプリケーション層に散在し、ドメインモデルは単なるデータの入れ物(Anemic Domain Model)になってしまいます。

CQRS/Event Sourcingはこの問題を解決します:

  • CQRSにより、書き込み側(コマンド)でビジネスの振る舞いを豊かにモデリングし、読み取り側(クエリ)で効率的なデータ取得を実現できます。コマンドはユーザーの意図を直接表現するため、ドメインエキスパートとの対話で生まれる「ユビキタス言語」をそのままコードに反映できます
  • Event Sourcingにより、システムが「何をしたか」の完全な履歴が残り、新しいビジネス要件に応じて過去のデータを再解釈できます

性能面でも、書き込みと読み取りを独立して最適化・スケールできる利点がありますが、これは副次的な効果です。CQRS/Event Sourcingの本質的な価値は、複雑なビジネスドメインを適切にモデリングできることにあります。

ただし、複雑性が増すため、すべてのシステムに適しているわけではありません。ビジネスが競争優位を得られる複雑なドメインでこそ、最大の効果を発揮します。この記事では、Apache PekkoとScala 3を使った実装例を通じて、CQRS/Event Sourcingの実践的な使い方を解説します。

概要

CQRS/Event Sourcingとは

CQRS(Command Query Responsibility Segregation:コマンドクエリ責任分離)は、データの「書き込み(コマンド)」と「読み取り(クエリ)」のモデルを分離するアーキテクチャパターンです。従来の設計では同じモデルで読み書きを行いますが、CQRSでは以下のように分離します:

  • コマンド側(書き込み): ビジネスルールを実行し、データを変更する
  • クエリ側(読み取り): 効率的にデータを読み取るために最適化されたモデルを提供する

Event Sourcing(イベントソーシング)は、すべての状態変更を「イベント」として記録するパターンです。現在の状態を直接保存するのではなく、状態に至るまでのイベントの履歴を保存します。これにより:

  • 完全な履歴: すべての変更履歴が残る
  • 監査証跡: 誰が、いつ、何を変更したかが明確
  • 時間旅行: 過去の任意の時点の状態を再現可能
  • イベント駆動: イベントを他のシステムに配信して連携可能

システム構成

CQRSは構成としては、コマンドAPIクエリAPIリードモデルアップデータの3つに分かれます。

  1. コマンドAPI: ユーザーからのコマンドを受け取り、ビジネスルールを検証し、イベントを生成・保存
  2. クエリAPI: 最適化されたリードモデルからデータを高速に取得
  3. リードモデルアップデータ: イベントを監視し、クエリ用のリードモデルを非同期に更新

エントリポイント

エントリポイントはここを参照してください。

コマンドAPI及びクエリAPIはGraphQLサーバーです。pekko-http上でSangriaを提供しています。リードモデルアップデータ(以下RMU)はドメインイベントを受け取ってリードモデルを構築する、AWS Lambdaの実装です。

コマンドAPIとクエリAPIはsbt-native-packagerによってDocker Imageにビルドされます。RMUはLocal Stackにデプロイします。

コマンド側のドメインモデルはPekkoのアクター上で実行されます。実際の本番ではクラスター上でアクターを実行する必要がありますが、ローカルでは単一プロセスまたはクラスタモードで起動できます。詳細な起動方法はREADME.mdを参照してください。

コマンド側

ドメインモデル

ドメインモデルは、ビジネスルールを表現する純粋なScalaコードです。アクターとビジネスロジックを一緒に実装すると、設計が複雑になりテストもむずかしくなります。そのため、Pekkoによる実装ではアクターを集約として実装しますが、アクターとドメインモデルは明確に分離します。

この分離により、以下の利点が得られます:

  1. テスタビリティ: ビジネスロジックを純粋な関数としてテストでき、アクターの仕組みに依存しない
  2. ビジネスロジックの明確化: Pekkoの技術的詳細とビジネスルールが混ざらず、ドメインロジックが理解しやすい
  3. 再利用性: 異なる永続化技術やフレームワークでも同じドメインモデルを使える
  4. 保守性: ビジネスルールの変更が、アクターのライフサイクル管理に影響しない

コードはここから。基本的にはScalaのみに依存する、エンティティや値オブジェクトなどの純粋なクラス実装です。

今回はUserAccount集約を例に解説します。

UserAccountエンティティはここです。

一つ特徴的なところと言えば、Event Sourcing前提なので状態遷移する際にイベントを返すというところです。

trait UserAccount extends Entity {
  def id: UserAccountId
  def name: UserAccountName
  def emailAddress: EmailAddress

  // 名前変更:新しい状態とイベントを返す
  def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]

  // 削除:新しい状態とイベントを返す
  def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}

実装では、ビジネスルールを検証してから状態を変更します:

override def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
  if (name == newName) {
    Left(RenameError.FamilyNameSame)  // ビジネスルール:同じ名前への変更は不可
  } else {
    val updated = this.copy(name = newName, updatedAt = DateTime.now())
    val event = UserAccountEvent.Renamed_V1(
      id = DomainEventId.generate(),
      entityId = id,
      oldName = name,
      newName = newName,
      occurredAt = DateTime.now()
    )
    Right((updated, event))  // 新しい状態とイベントを返す
  }

このように、ドメインモデルは:

  • 純粋な関数: 副作用なく状態遷移をモデル化
  • ビジネスルールの検証: 不正な操作はLeftでエラーを返す
  • イベント生成: 状態変更と同時にイベントを生成

集約

集約は、ドメインイベントを永続化し、ドメインモデルのライフサイクルを管理するアクターです。集約をアクターとして実装することで、以下の利点が得られます:

  1. 並行性の管理: アクターモデルにより、同じ集約への同時アクセスが自動的にシリアライズされる
  2. 状態の一貫性: メッセージ処理が逐次的なため、競合状態が発生しない
  3. イベントの永続化: Pekko Persistenceがイベントの保存とリプレイを自動的に管理
  4. 障害からの復旧: アクターの再起動時、イベントをリプレイして状態を復元
  5. 分散配置: クラスター上で集約を分散配置し、スケールアウト可能

UserAccount集約はここです。

集約はPekkoの型付きアクターとして実装され、永続化機能を持ちます:

def apply(id: UserAccountId): Behavior[Command] = {
  val config = PersistenceEffectorConfig
    .create[UserAccountAggregateState, UserAccountEvent, Command](
      persistenceId = s"${id.entityTypeName}-${id.asString}",
      initialState = UserAccountAggregateState.NotCreated(id),
      applyEvent = (state, event) => state.applyEvent(event)
    )
    .withPersistenceMode(PersistenceMode.Persisted)
    .withSnapshotCriteria(SnapshotCriteria.every(1000))  // 1000イベントごとにスナップショット
    .withRetentionCriteria(RetentionCriteria.snapshotEvery(2))  // 最新2つを保持

  Behaviors.setup[Command] { implicit ctx =>
    PersistenceEffector.fromConfig(config) {
      case (initialState: UserAccountAggregateState.NotCreated, effector) =>
        handleNotCreated(initialState, effector)
      case (initialState: UserAccountAggregateState.Created, effector) =>
        handleCreated(initialState, effector)
      // ...
    }
  }
}

状態はNotCreated、Created、Deletedの複数ありますが、最初はNotCreated状態から始まります。NotCreated状態でのコマンド処理:

private def handleNotCreated(
  state: UserAccountAggregateState.NotCreated,
  effector: PersistenceEffector[...]
): Behavior[Command] = Behaviors.receiveMessagePartial {
  case Create(id, name, emailAddress, replyTo) if state.id == id =>
    val (newState, event) = UserAccount(id, name, emailAddress)  // ドメインモデルで生成
    effector.persistEvent(event) { _ =>
      replyTo ! CreateSucceeded(id)
      handleCreated(UserAccountAggregateState.Created(newState), effector)  // Created状態へ
    }
}

Created状態でのRenameコマンド処理:

private def handleCreated(...): Behavior[Command] = Behaviors.receiveMessagePartial {
  case Rename(id, newName, replyTo) if state.user.id == id =>
    state.user.rename(newName) match {  // ドメインモデルに委譲
      case Left(reason) =>
        replyTo ! RenameFailed(id, reason)
        Behaviors.same  // 状態維持
      case Right((newUser, event)) =>
        effector.persistEvent(event) { _ =>  // イベント永続化
          replyTo ! RenameSucceeded(id)
          handleCreated(state.copy(user = newUser), effector)  // 状態更新
        }
    }
}

重要なポイント:

  • 状態駆動: 各状態で受け付けるコマンドが異なる
  • イベント永続化: persistEventでイベントを保存してから状態遷移
  • ドメインモデルへの委譲: ビジネスルールはドメインモデルが担当

イベント

イベントは、過去に起きた事実を表現する不変のデータです。Event Sourcingを採用することで、以下の利点が得られます:

  1. 完全な監査証跡: すべての変更が記録され、誰が・いつ・何を変更したかが明確
  2. 時間旅行デバッグ: 過去の任意の時点の状態を再現し、バグの原因を特定できる
  3. イベント駆動連携: 他のシステムにイベントを配信して、疎結合な連携が可能
  4. ビジネス分析: イベント履歴から業務の傾向やパターンを分析できる
  5. データ移行の柔軟性: イベントを再生して、新しいデータモデルを構築できる

イベントは「過去に起きた事実」を表現します。コードはこちらです。

enum UserAccountEvent extends DomainEvent {
  case Created_V1(id: DomainEventId, entityId: UserAccountId, name: UserAccountName, ...)
  case Renamed_V1(id: DomainEventId, entityId: UserAccountId, oldName: UserAccountName, newName: UserAccountName, ...)
  case Deleted_V1(id: DomainEventId, entityId: UserAccountId, ...)
}

イベントの特徴:

  • 過去形で命名: CreatedRenamedDeleted など、すでに起きたことを表現
  • 不変(Immutable): 一度作成されたら変更できない
  • バージョニング: _V1 サフィックスでバージョン管理。将来イベント構造を変更する場合は _V2 を追加
  • 必要な情報を含む: イベントだけで何が起きたかを完全に理解できる

ドメインモデルの状態遷移メソッド(renamedeleteなど)は、新しい状態とともにイベントを返します:

def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]

このパターンにより、ビジネスロジックの実行とイベントの生成が密接に結びつき、ドメインモデルの整合性が保たれます。

ユースケース

ユースケース層は、アプリケーションの進行役(オーケストレーター)としての責務を担います。コードはこちらです。

重要:ユースケースはビジネスロジックを直接書く場所ではありません。

ユースケース層の責務:

  1. 進行役(Orchestrator): 複数の処理ステップを調整・制御する
  2. アクターとの通信: Pekkoアクターにメッセージを送信し、レスポンスを受け取る
  3. エラーハンドリング: アクターからのエラーをアプリケーション層のエラーに変換
  4. 副作用の管理: 非同期処理やエラー処理をZIOで型安全に記述
def createUserAccount(
  userAccountName: UserAccountName,
  emailAddress: EmailAddress
): IO[UserAccountUseCaseError, UserAccountId] =
  for {
    // 1. IDを生成(進行役として準備)
    userAccountId <- ZIO.succeed(UserAccountId.generate())

    // 2. アクターに処理を委譲(ビジネスロジックはアクターとドメインモデルが担当)
    reply <- askActor[CreateReply] { replyTo =>
      Create(id = userAccountId, name = userAccountName,
             emailAddress = emailAddress, replyTo = replyTo)
    }

    // 3. レスポンスを変換(進行役として結果を整理)
    result <- reply match {
      case CreateSucceeded(id) => ZIO.succeed(id)
    }
  } yield result

責務の分離

  • ユースケース: 「誰に」「何を」依頼するかを決める進行役
  • ドメインモデル: ビジネスルールを検証・実行する(renameの同名チェックなど)
  • アクター: 永続化とライフサイクル管理を担当

ユースケースはPekkoアクターへの橋渡し役に徹することで、ビジネスロジックがドメイン層に集約され、各層の責務が明確になります。

インターフェイスアダプタ

インターフェイスアダプタ層は、外部(ユーザーやシステム)とドメイン層を繋ぐ役割を担います。コマンド側のインターフェイスアダプタには以下の要素があります。

コントラクト(Protobuf定義)

Protocol Buffersを使用して、アクターが受け取るコマンドメッセージとレスポンスメッセージを定義します。コントラクト定義を参照してください。

例えば、値オブジェクトの定義:

syntax = "proto3";

package io.github.j5ik2o.pcqrses.command.interfaceAdapter.persistence.basic;

message UserAccountName {
  string first_name = 1;
  string last_name = 2;
}

Protobufを使う理由:

  • 型安全: コンパイル時に型チェックされる
  • 言語非依存: 異なる言語間でも同じメッセージ定義を共有可能
  • 効率的: バイナリフォーマットで高速・省サイズ
  • バージョニング: フィールド追加に対する後方互換性(フィールド番号を変えない)

イベントシリアライザ

イベントをバイナリ形式(Protobuf)にシリアライズ/デシリアライズします。コードはこちらです。

まず、イベントのProtobuf定義:

message UserAccountEvent_Created_V1 {
  string event_id = 1;
  string user_account_id = 2;
  basic.UserAccountName user_name = 3;
  string email_address = 4;
  google.protobuf.Timestamp occurred_at = 5;
}

message UserAccountEvent_Renamed_V1 {
  string event_id = 1;
  string user_account_id = 2;
  basic.UserAccountName old_name = 3;
  basic.UserAccountName new_name = 4;
  google.protobuf.Timestamp occurred_at = 5;
}

Pekko Persistenceは、イベントをジャーナル(データベース)に保存する際、バイナリ形式に変換します。カスタムシリアライザを実装することで:

  • パフォーマンス: 効率的なバイナリ形式で保存
  • 互換性: イベント構造が変わっても古いイベントを読み込み可能(V1, V2など)
  • 柔軟性: バージョニング戦略を自由に実装
class UserAccountEventSerializer extends SerializerWithStringManifest {
  override def manifest(o: AnyRef): String = o match {
    case _: UserAccountEvent.Created_V1 => "UserAccountCreatedV1"
    case _: UserAccountEvent.Renamed_V1 => "UserAccountRenamedV1"
    // ...
  }

  override def toBinary(o: AnyRef): Array[Byte] = {
    // ドメインイベント → Protobufメッセージ → バイト列
    o match {
      case event: UserAccountEvent.Created_V1 =>
        val proto = UserAccountEvent_Created_V1(
          eventId = event.id.asString,
          userAccountId = event.entityId.asString,
          // ...
        )
        proto.toByteArray
    }
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    // バイト列 → Protobufメッセージ → ドメインイベント
    manifest match {
      case "UserAccountCreatedV1" =>
        val proto = UserAccountEvent_Created_V1.parseFrom(bytes)
        UserAccountEvent.Created_V1(
          id = DomainEventId(proto.eventId),
          entityId = UserAccountId(proto.userAccountId),
          // ...
        )
    }
  }
}

HTTPエンドポイント

外部からのリクエストを受け取り、ユースケースを呼び出してレスポンスを返します。コマンドAPIを参照してください。

このプロジェクトでは、Pekko HTTP上でGraphQLサーバー(Sangria)を提供しています。GraphQLを使用することで、クライアントは必要なデータだけを柔軟に取得できます。

クエリ側

クエリ側は読み取り専用のリードモデルを提供します。コマンド側とは完全に分離されており、読み取りに最適化されたデータ構造を持ちます。

なぜクエリ側にユースケース層がないのか

クエリ側にはユースケース層がありません。なぜならば、GraphQLのDSL自体がユースケース層の役割を果たすからです。GraphQLのクエリ定義が、どのデータをどのように取得するかを表現します。

インターフェイスアダプタ

インターフェイスアダプタにはDAOとGraphQLのリゾルバがあります。

DAO(データアクセス)

DAOはデータベースへのアクセスを抽象化します。コードはこちらです。

このプロジェクトでは、Slick(Scalaの型安全なデータベースアクセスライブラリ)を使用し、sbt-dao-generatorでデータベーススキーマからDAOクラスを自動生成します。

自動生成されたDAO:

// このファイルは自動生成ファイルなので直接編集しないでください。
trait UserAccountsComponent extends SlickDaoSupport {
  import profile.api._

  // レコード定義
  final case class UserAccountsRecord(
    id: String,
    firstName: String,
    lastName: String,
    createdAt: java.sql.Timestamp,
    updatedAt: java.sql.Timestamp
  )

  // テーブル定義
  final case class UserAccounts(tag: Tag) extends TableBase[UserAccountsRecord](tag, "user_accounts") {
    def id: Rep[String] = column[String]("id")
    def firstName: Rep[String] = column[String]("first_name")
    def lastName: Rep[String] = column[String]("last_name")
    def createdAt: Rep[java.sql.Timestamp] = column[java.sql.Timestamp]("created_at")
    def updatedAt: Rep[java.sql.Timestamp] = column[java.sql.Timestamp]("updated_at")

    def pk: PrimaryKey = primaryKey("pk", id)

    override def * : ProvenShape[UserAccountsRecord] =
      (id, firstName, lastName, createdAt, updatedAt) <> (
        UserAccountsRecord.apply,
        UserAccountsRecord.unapply)
  }

  // DAOオブジェクト
  object UserAccountsDao extends TableQuery(UserAccounts.apply)
}

自動生成の利点:

  • 型安全: コンパイル時にSQLの誤りを検出
  • 生産性: ボイラープレートコードを手書きしなくて済む
  • 一貫性: スキーマとコードが常に同期

DAOの生成方法はREADME.mdを参照してください。

使用例:

// 全件取得
UserAccountsDao.findAll()

// IDで検索
UserAccountsDao.findById(id)

// 複数IDで検索
UserAccountsDao.findByIds(ids)

// 条件検索(Slickの型安全なクエリ)
UserAccountsDao
  .filter(u => u.firstName.toLowerCase.like(s"%${searchTerm.toLowerCase}%"))
  .result

GraphQLリゾルバ

GraphQLリゾルバは、GraphQLクエリをDAOへのデータベースクエリに変換します。コードはこちらです。

Field(
  "getUserAccount",
  OptionType(UserAccountType),
  arguments = UserAccountIdArg :: Nil,
  resolve = ctx => {
    val id = ctx.arg(UserAccountIdArg)
    ctx.ctx.runDbAction(UserAccountsDao.findById(id))
  }
)

GraphQLを使用することで:

  • 柔軟性: クライアントが必要なフィールドだけを取得可能
  • 効率性: 1回のリクエストで複数のリソースを取得可能
  • 型安全: スキーマ定義によりAPI契約が明確

データベースマイグレーション

データベーススキーマの変更はFlywayで管理します。コードはこちらです。

Flywayはバージョン管理されたSQLマイグレーションスクリプトを順番に実行します。マイグレーションファイルは V{バージョン}__{説明}.sql という命名規則に従います。これにより、スキーマの変更履歴が明確になり、チーム開発でのデータベース変更が安全に行えます。

マイグレーションの実行方法はREADME.mdを参照してください。

リードモデルアップデータ

リードモデルアップデータは、イベントを監視し、クエリ用のリードモデルを更新するコンポーネントです。同期的にリードモデルを更新すると、コマンド処理がクエリ側の更新完了を待つ必要があり、レスポンス時間が長くなります。また、クエリ側のデータベース障害がコマンド処理を停止させてしまい、システム全体の可用性が低下します。

非同期更新を採用することで、以下の利点が得られます:

  1. 疎結合: コマンド側とクエリ側が独立し、互いに影響を与えない
  2. 独立したスケーリング: 読み取り負荷と書き込み負荷に応じて、それぞれを個別にスケール可能
  3. パフォーマンス: コマンド処理がクエリ側の更新を待つ必要がなく、高速にレスポンスを返せる
  4. 柔軟性: 複数の異なるリードモデルを、同じイベントストリームから構築できる
  5. 障害の分離: クエリ側の障害が、コマンド側の処理に影響しない

仕組みの概要

  1. コマンド側がイベントをジャーナル(DynamoDB)に保存
  2. DynamoDB Streamsがイベントの変更を検知
  3. AWS Lambda(リードモデルアップデータ)がイベントを処理
  4. クエリ側データベース(PostgreSQL)のリードモデルを更新

この非同期パイプラインにより、コマンド側とクエリ側が疎結合に保たれ、それぞれが独立してスケールできます。

イベントプロセッサ

イベントプロセッサは、DynamoDB Streamsから受け取ったイベントを解釈し、適切なデータベース操作に変換します。コードはこちらです。

処理の流れ:

private def processUserAccountEvent(event: UserAccountEvent): Either[ProcessingError, Unit] = {
  val action = event match {
    case UserAccountEvent.Created_V1(_, entityId, name, _, occurredAt) =>
      // リードモデルにレコードを挿入
      UserAccountsDao.insertOrUpdate(record)

    case UserAccountEvent.Renamed_V1(_, entityId, _, newName, occurredAt) =>
      // リードモデルのレコードを更新
      UserAccountsDao.filter(_.id === entityId).update(...)

    case UserAccountEvent.Deleted_V1(_, entityId, _) =>
      // リードモデルからレコードを削除
      UserAccountsDao.filter(_.id === entityId).delete
  }
  db.run(action)
}

重要なポイント:

  • イベント駆動: コマンド側の変更を検知して自動的に処理
  • べき等性: 同じイベントを複数回処理しても結果が変わらない(insertOrUpdateを使用)
  • エラーハンドリング: 失敗したイベントはログに記録し、後で再処理可能

AWS Lambda実装

このプロジェクトでは、リードモデルアップデータをAWS Lambdaとして実装しています。

Lambdaを使う理由:

  • サーバーレス: インフラ管理が不要
  • 自動スケール: イベント量に応じて自動的にスケール
  • コスト効率: 実行時間に対してのみ課金
  • DynamoDB Streams統合: ネイティブに統合されており設定が簡単

ローカル開発ではLocalStackを使用してAWS環境をシミュレートします。LocalStackへのデプロイやDynamoDB Streamsとの接続方法はREADME.mdを参照してください。

永続化

Event Sourcingでは、すべての状態変更をイベントとして永続化します。Pekko Persistenceがこの永続化メカニズムを提供します。

ジャーナル

ジャーナルは、イベントを順番に記録するストレージです。このプロジェクトでは、DynamoDBをジャーナルとして使用し、pekko-persistence-dynamodb-journal-v2pekko-persistence-dynamodb-snapshot-v2を利用しています。

ジャーナルの特性を考えると、DynamoDBは理想的な選択肢です:

  1. Append-Onlyの特性に最適:

    • ジャーナルは「追記のみ」で、柔軟なクエリは不要
    • DynamoDBは高速な追記(Write)に最適化されている
  2. シンプルなアクセスパターン:

    • イベントの追記:特定の集約IDに対してイベントを順番に追加
    • イベントのリプレイ:特定の集約IDのイベントを順番に読み取り
    • 複雑なJOINやクエリは不要
  3. CDC(Change Data Capture)のためのDynamoDB Streams:

    • イベントの変更を自動的に検知し、後段のシステムに配信
    • リードモデルアップデータがStreamsを監視して、クエリ側を更新
    • イベント駆動アーキテクチャの実現に不可欠
  4. 運用面の利点:

    • 高スループット:大量のイベント書き込みを高速に処理
    • 自動スケール:負荷に応じて自動的にスケールアウト
    • 低レイテンシ:ミリ秒単位の応答時間
    • マネージドサービス:運用負荷が低い

ジャーナルの役割:

  • イベントの永続化: すべてのドメインイベントを不変の記録として保存
  • 順序保証: 各集約(エンティティ)のイベントを順番通りに記録
  • リプレイ: アクター起動時、過去のイベントを再生して現在の状態を復元
  • CDC: DynamoDB Streamsを通じて、イベントを後段のシステムに配信

例えば、UserAccountが以下のイベント履歴を持つ場合:

1. Created_V1(name="太郎 山田")
2. Renamed_V1(oldName="太郎 山田", newName="次郎 山田")
3. Renamed_V1(oldName="次郎 山田", newName="三郎 山田")

アクター起動時、これらのイベントを順番に適用して最新の状態(name="三郎 山田")を復元します。

スナップショット

スナップショットは、定期的に保存される集約の状態のコピーです。スナップショットを使うことで、以下の利点が得られます:

  1. 起動時間の短縮: 数万のイベントをリプレイせず、最新のスナップショットから開始できる
  2. メモリ効率: 古いイベントをメモリに保持する必要がない
  3. パフォーマンス: リカバリ時間が予測可能で安定する
  4. スケーラビリティ: アクターの起動が高速なため、より多くのアクターを扱える

スナップショットの仕組み:

  • 定期的な状態保存: 一定数のイベントごとに現在の状態を保存
  • 高速リカバリ: 最新のスナップショットから復元し、その後のイベントだけをリプレイ
.withSnapshotCriteria(SnapshotCriteria.every(1000))  // 1000イベントごとにスナップショット
.withRetentionCriteria(RetentionCriteria.snapshotEvery(2))  // 最新2つのスナップショットを保持

例えば、1万個のイベントがある場合:

  • スナップショットなし: 1万個のイベントをすべてリプレイ(遅い)
  • スナップショットあり: 最新のスナップショット(9000個目)から復元し、残り1000個だけをリプレイ(速い)

データベース設定

コマンド側とクエリ側で異なるデータベースを使用しています。異なるデータベースを使い分けることで、以下の利点が得られます:

  1. 用途の最適化: 書き込み特化と読み取り特化で、それぞれに最適なDBを選択
  2. 性能の最大化: イベントストアには高速書き込み、クエリには柔軟な検索
  3. 独立したスケーリング: 書き込み負荷と読み取り負荷に応じて、個別に最適化
  4. 技術の進化対応: 片側だけDB技術を変更しても、他方に影響しない

このプロジェクトでは以下のデータベースを使用します:

  1. DynamoDB(コマンド側)

    • ジャーナル:イベントストア
    • スナップショット:状態のスナップショット
    • 理由:高スループット、自動スケール、AWSエコシステムとの統合
  2. PostgreSQL(クエリ側)

    • リードモデル:正規化されたクエリ用テーブル
    • 理由:リレーショナルクエリに最適、複雑な検索が可能

このポリグロット永続化により、それぞれの用途に最適なデータベースを選択できます。

テスト

CQRS/ESシステムのテストは、各層で異なるアプローチが必要です。

ユニットテスト

ドメインモデルのテストは最も重要です。ビジネスロジックが正しく動作することを保証します。

ドメインモデルはPekkoに依存しない純粋なScalaコードなので、テストが容易です:

test("ユーザー名を変更できる") {
  val (user, _) = UserAccount(id, name, email)
  val result = user.rename(newName)

  result shouldBe Right((updatedUser, RenamedEvent))
}

アクターのテストにはActorTestKitを使用します:

val testKit = ActorTestKit()
val probe = testKit.createTestProbe[Reply]()
val actor = testKit.spawn(UserAccountAggregate(id))

actor ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))

統合テスト

統合テストでは、実際のデータベースやLambdaとの連携を検証します。LocalStackを使用して、AWS環境をローカルで再現し、エンドツーエンドのフローをテストできます。

テストの実行方法はREADME.mdを参照してください。

デプロイと運用

Docker構成

このプロジェクトはsbt-native-packagerを使用してDockerイメージをビルドします。各サービス(コマンドAPI、クエリAPI)が独立したDockerイメージになり、個別にスケール可能です。

LocalStack設定

LocalStackは、AWSサービスをローカルで実行するためのツールです。開発時にAWSアカウントなしでテストできます。

LocalStackを使用したローカル環境では以下が実行されます:

  1. Docker Composeで必要なサービスを起動(DynamoDB、PostgreSQL、LocalStack)
  2. Lambda関数をLocalStackにデプロイ
  3. DynamoDB StreamsをLambdaに接続
  4. コマンドAPIとクエリAPIを起動

クラスターモード

本番環境では、Pekkoアクターをクラスターモードで実行します。複数のノードが協調して動作し、高可用性とスケーラビリティを実現します。

クラスターモードでは:

  • アクターの分散配置: ユーザーアカウントアクターが複数ノードに分散
  • 自動フェイルオーバー: ノードがダウンしても他のノードが処理を引き継ぐ
  • 負荷分散: リクエストが複数ノードに分散される

単一プロセスモードとクラスターモードの違いは、主にapplication.confの設定です。開発時は単一プロセス、本番ではクラスターを使用します。

詳細な実行方法はREADME.mdを参照してください。

トレードオフと適用判断

CQRS/Event Sourcingは強力なパターンですが、すべてのシステムに適しているわけではありません。

メリット

  • 完全な監査証跡: すべての変更履歴が自動的に記録され、コンプライアンス要件に対応しやすい
  • 独立したスケーリング: 読み取りと書き込みを個別に最適化・スケールでき、パフォーマンス要件に柔軟に対応
  • 時間旅行デバッグ: 過去の任意の時点の状態を再現でき、バグの原因特定が容易
  • 柔軟なクエリモデル: 同じイベントストリームから複数の異なるリードモデルを構築できる

デメリット

  • 複雑性の増加: コマンド側・クエリ側・リードモデルアップデータの3つのコンポーネントを管理する必要がある
  • 結果整合性: リードモデルの更新は非同期なため、コマンド実行直後はクエリで最新データが取得できない可能性がある
  • 学習コスト: Event SourcingやCQRSの概念を理解し、適切に実装するには時間がかかる
  • 運用コスト: イベントストアの管理、スナップショットの戦略、リードモデルの再構築など、運用上の考慮事項が増える

適用判断

適している場合:

  • 監査証跡が必須のシステム(金融、医療、法務など)
  • 読み取りと書き込みの負荷特性が大きく異なるシステム
  • 複雑なビジネスルールを持ち、ドメインモデルの明確な分離が必要なシステム

適していない場合:

  • シンプルなCRUDアプリケーション
  • 小規模なシステムで複雑性が正当化できない場合
  • 強い整合性が必須で、結果整合性を許容できない場合

まとめ

この記事では、Apache PekkoとScala 3を使用したCQRS/Event Sourcingの実装例を解説しました。

CQRS/Event Sourcingの基本

  • コマンド側とクエリ側を分離することで、それぞれを最適化できる
  • イベントソーシングにより、完全な履歴が残り、監査証跡として活用できる
  • 非同期のリードモデル更新により、システムが疎結合になる

実装のポイント

  • ドメインモデル: ビジネスルールを純粋なScalaコードで表現し、Pekkoから独立させる
  • アクター: 集約をアクターとして実装し、永続化と状態管理を担当
  • イベント: 過去形で命名し、バージョニングで進化可能にする
  • リードモデルアップデータ: イベント駆動でクエリ側を非同期に更新
  • ポリグロット永続化: 用途に応じて最適なデータベースを選択

技術スタック

  • Apache Pekko: アクターモデルとEvent Sourcingの基盤
  • Scala 3: 型安全で表現力豊かな言語
  • Protocol Buffers: 効率的なシリアライゼーション
  • GraphQL: 柔軟なクエリAPI
  • AWS Lambda: サーバーレスなイベント処理
  • LocalStack: ローカル開発環境

CQRS/Event Sourcingは、大規模で複雑なシステムに適したパターンです。小規模なシステムでは過剰設計になる可能性があるため、プロジェクトの要件に応じて採用を検討してください。

次のステップ

実際にこのプロジェクトを動かしてみたい方は、README.mdに詳細な手順が記載されています。

質問やフィードバックがあれば、GitHubのIssueでお気軽にどうぞ!