PayPayではマイクロサービスにおいて、SQLデータベース、NewSQLデータベース、KVデータベース、ファイルシステム等、様々なデータストレージソリューションを使っています。そこで、DaaS(Data as a Service)チームがDWHを構築し、ユーザーがデータ分析に使う統合データを提供しています。

私たちはバッチアプリケーションであるSparkを使用し、様々なデータソースからデータを抽出するアプリケーションを構築しました。従来のアプローチと同じようにデータを徐々に取り出し、データの重複を排除し、タイムスタンプに基づいてAWS S3に書き出します。次に、データを正規化した後、Google BigQueryにアップロードします。このアプローチでは、各データベースクラスタにRO(読み取り専用)レプリカインスタンスを作成してデータを抽出し、RW(読み取り/書き込み)インスタンスのパフォーマンスへの影響を避ける必要があります。データ量が少なかった初期の頃は、この方法でほぼすべてのユースケースを解決することが可能でした。

データ量の増加に伴い、ROレプリカインスタンスとRWインスタンス間のbinlog同期が頻繁かつ低速になり、タイムスタンプベースの抽出の信頼性が低下しました。データベースの同期問題を修正するために多くの時間と労力を費やす羽目になり、また、データベースが同時に存在する問題やDWHとデータベース内のデータ整合性問題の修正にも多くの時間と労力がかかりました。このアプローチでデータを読み込むと削除したデータを特定することができず、その後の数値分析に大きな影響を与えるという問題も出てきました。

上記のそれぞれの問題を解決・改善するために、RWインスタンスが生成したbinlogを読み取り、増分更新データパイプラインを構築することにしました。binlogには連続性と原子性があるため、データ層のタイムスタンプによるデータの不整合を気にする必要がなくなります。また、binlogはRWインスタンスから直接生成されるため、データ取得にROレプリカを使用する必要がなく、データ取得がよりシンプルで効率的になります。さらに、binlogは行に対する削除操作を記録します。これにより、データソースからどのようなデータが削除されたがわかります。

Delta DataLakeとApache Hudiを比較してみました。どちらもHadoopベースのCDCインクリメンタルビルドACIDデータソリューションをサポートしています。私たちは書き込みを最適化したアーキテクチャーを構築したいと考え、書き込み最適化モード(別名「MOR」-Merge On Read)をサポートしているApache Hudiを使うことに決めました。

Hudiの基本的なワークフローを簡単にまとめると、次のようになります。

  1. データの取り込み
  2. 事前結合
  3. インデックスを計算し、ベースファイルのリレーションを生成
  4. parquetファイルとavroファイルを生成
  5. 圧縮とクリーニング

1.データの取り込み

Apache kafkaをbinlogのメッセージキューとして使用して、同じ主キーを持つデータが常に同じkafkaパーティション内に存在するようにし、パーティションレベルでのデータ順序の信頼性を確保します。パフォーマンスを考慮し、設計には最低1モードを用い、その後のデータ処理でデータ重複問題を解決しています。

2.事前結合

kafkaのオフセットとbinlogイベント内のシーケンス情報を使用して、Hudiに書き込まれる最終データを決定します。ここで、Hudiに書き込む前にデータの重複を防ぐことができます。

3.インデックスを計算し、ベースファイルのリレーションを生成

Hudiはユーザー定義のレコードキーとパーティションフィールドに基づいてインデックスを生成します。Apache Hiveのように、パーティションフィールドによってパーティションの位置を取得できます。ブルームフィルタの性質上、インデックスを使用することで、データを保存するパーティションのどのファイルスライスに保存する必要があるかを簡単に判別できます。

4.parquetファイルとavroファイルを生成

Binlogイベントには、Insert、Update、Deleteの3種類のオペレーションが含まれますが、後続のデータパイプラインオペレーションのDelete操作と区別するために、InsertとUpdateのみを考慮します。Delete操作は、内部列の「is_deleted」の指定されたフィールドの値をtrueに設定するように変換されるため、Update操作となります。通常はupsert操作を使用してデータが新しい挿入かどうかを判断するためにindex(bloomフィルター)を渡し、yesの場合は新しいparquetファイルに書き込み、そうでない場合は同じファイルスライスにavroファイルを生成します。データに対するオペレーションを記録するためにHudiは完全なタイムライン設計を提供し、すべての書き込み操作をタイムラインに記録します。

5.圧縮およびクリーニング

ここがHudiで最も重要なステップです。というのも、Hudiはavroの読み取りとparquetの読み取りを同時にサポートするスナップショット・クエリメソッドを提供していますが、読み取りは重複を除外する際にパフォーマンスを消費するため、avroをparquetに圧縮することでクエリパフォーマンスが向上します。Hudiではavroで更新データを保存する時や、小さなparquetファイルを特定のサイズの新しいparquetファイルにマージする際に、どのファイルスライスを圧縮する必要があるかをタイムラインで判断します。私たちはMORモードを使用しているので、圧縮に起因するパフォーマンス問題を低減するために、圧縮の動作をN回の書き込みオペレーション毎に実行するように設定しています。

圧縮操作の後、Hudiはファイルをクリーンアップし、使用されなくなったファイルを削除します。クリーニングのオペレーションもHudiのタイムラインで記録します。

実際の運用プロセスでは、既存のデータをどのようにHudiベースのDWHに移行するかも重要なポイントとなります。従来のデータパイプラインに多くの問題があったため、最終的にSpark上のJDBCを使用することにし、プラットフォームベースのデータ抽出機能を使用して、すべてのデータを再移行することにしました。

Apache Hudiを使うことが決定した時、Hudiはまだ開発段階だったため、多くの問題に遭遇しましたが、Hudiコミュニティとコミュニケーションをとりながらいくつかの機能のコードを提供することで、問題の大部分を解決することができました。これにより、Hudiのワークフローだけでなく、私たち側のプログラミングの改善方法の理解にも寄与しました。

DaaSチームの取り組みを通じて、Filesystem、MySQL、TiDB、DynaMoDBの400以上のデータテーブルをHudiベースのデータパイプライン上で実行することに成功しました。マイクロバッチを1個ずつ実行するためにSpark Structured Streamingを使用したので、デフォルトでジョブ間隔を5分に設定し、ほとんどのマイクロバッチが指定時間内に取り込みから圧縮までのすべてのオペレーションを完了させることができました。これにより、データ遅延を数時間から数分に短縮することができ、飛躍的な進歩を遂げています。

Hudiが、Apache Presto、Apache Spark、Apache Hiveなどからのデータクエリをネイティブでサポートしていることにより、私たちのデータを、他のグループが利用しやすくなっています。また、2020年9月からは、AWS Athenaも、AWS Glueまたはsimple schema同期コードを使用するだけで、AthenaでHudiのテーブルを表示および使用できます。