Amazon EventBridge Pipes

SQS、Kinesis、DynamoDB Streams などのソースからターゲットへのポイントツーポイント統合をフィルタリング・変換付きで構築するサービス

概要

Amazon EventBridge Pipes は、イベントソースからターゲットへのポイントツーポイント統合をコードなしで構築するサービスです。SQS、Kinesis Data Streams、DynamoDB Streams、Amazon MSK、セルフマネージド Kafka をソースとし、フィルタリング、エンリッチメント、入力変換を経てターゲットに配信する 4 段階のパイプラインを定義できます。Lambda や Step Functions でカスタムロジックを挟むことも可能で、従来は Lambda 関数で実装していたイベント連携パターンを宣言的に構築できます。

ソース・フィルター・エンリッチメント・ターゲットの 4 段パイプライン

EventBridge Pipes のパイプラインは、ソース、フィルタリング、エンリッチメント、ターゲットの 4 段階で構成されます。ソースはイベントの発生元で、SQS キュー、Kinesis Data Streams、DynamoDB Streams、Amazon MSK トピック、セルフマネージド Kafka トピックに対応しています。フィルタリングはイベントパターンマッチングで不要なイベントを除外する段階で、EventBridge のイベントパターン構文を使用します。例えば DynamoDB Streams から INSERT イベントのみを抽出したり、SQS メッセージの特定フィールドが条件に合致するものだけを通過させたりできます。フィルタリングで除外されたイベントは課金対象外となるため、コスト最適化にも直結します。エンリッチメントは任意のステップで、Lambda 関数、Step Functions ステートマシン、API Gateway、API Destination を呼び出してイベントデータを補完・変換します。ターゲットは最終的な配信先で、Lambda、Step Functions、SQS、SNS、Kinesis、EventBridge イベントバス、ECS タスク、API Gateway など 15 以上のサービスに対応しています。

バッチ処理とエラーハンドリング設計

EventBridge Pipes はソースからイベントをバッチで取得し、効率的に処理します。バッチサイズ (1〜10,000) とバッチウィンドウ (最大 300 秒) を設定でき、バッチサイズに達するかバッチウィンドウが経過した時点でパイプラインが起動します。Kinesis や DynamoDB Streams をソースとする場合は、並列化係数 (Parallelization Factor) を設定してシャードあたりの同時処理数を増やせます。エラーハンドリングでは、ソースの種類に応じた再試行動作を理解することが重要です。SQS ソースの場合、処理に失敗したメッセージは可視性タイムアウト後にキューに戻り、最大受信回数を超えるとデッドレターキュー (DLQ) に移動します。Kinesis や DynamoDB Streams ソースの場合は、失敗時の再試行回数、レコードの最大経過時間、失敗時の分割 (Bisect on Function Error) を設定できます。障害が発生したバッチを半分に分割して再処理する Bisect 機能は、バッチ内の特定レコードが原因で全体が失敗するケースに有効です。失敗レコードの送信先として SQS キューや SNS トピックを指定し、後続の調査・再処理に備える設計が推奨されます。

イベント変換と InputTemplate の活用

EventBridge Pipes の入力変換 (Input Transformation) は、ソースから受け取ったイベントデータをターゲットが期待する形式に変換する機能です。InputTemplate は JSON テンプレートで、ソースイベントのフィールドを変数として参照し、ターゲットに渡すペイロードを自由に構成できます。例えば DynamoDB Streams の NewImage から必要なフィールドだけを抽出し、フラットな JSON に変換してから SQS に送信するといった処理を、Lambda 関数なしで実現できます。変数の参照には JSONPath 風の構文 (<$.detail.orderId>) を使用し、静的な値と動的な値を組み合わせたテンプレートを定義します。エンリッチメントの出力結果もテンプレート内で参照でき、ソースイベントとエンリッチメント結果を統合した新しいペイロードを構築できます。実務では、InputTemplate を活用してターゲットサービスの API 仕様に合わせたペイロード変換を行い、中間の Lambda 関数を排除するパターンが効果的です。Lambda を挟まないことで、コールドスタートのレイテンシ、Lambda の同時実行数制限、追加のコストを回避でき、シンプルで信頼性の高い統合を実現できます。

共有するXB!