日々スマートにお買い物をするため「PayPay」を活用するお客様が増えており、決済の種類も数もどんどん増えています。さて、エンジニアリングの観点から見ると、決済数の増加によって影響を受けるものとして、「ペイアウト(加盟店への支払)サブシステム」があります。ペイアウトサブシステムは、決済時はもちろんのこと、決済の取り消しや返金、還元クーポン適用時などの幅広い処理において加盟店手数料の計算を行います。PayPayの加盟店数は今では300万を超え、加盟店の規模も業種も多岐に渡るため、内部システムでの資金の移動や会計レポートの作成、突合ファイルの生成などを行う際、加盟店独自の要件に合わせる必要があります。
バックエンドでは、これらの処理のほとんどがバッチで実行され、加盟店IDや決済IDなどカーディナリティの高い値を使いながら高度に並列化されています。しかし、最近PayPayでは2つの大きな問題に直面しました。ひとつは加盟店ごとに行う決済や手数料、税額の集計における問題、もうひとつは加盟店からの要望に合わせて作成する突合ファイル生成の問題でした。今回はまず、前者の「集計の問題」を取り上げたいと思います。後者の「突合ファイル」については、次回以降のブログで詳しくお伝えします。
スケーラビリティ問題の分析
PayPayの日々の取扱高、手数料、税額を集計し、内部システムで適切に資金移動を行うことを「リリースプロセス」と呼んでいます。以前使っていたペイアウトシステムはワーカーノードへの負荷の分散をKafkaのパーティショニングに依存し、それぞれのワーカーノードは合計を計算した上でAmazon RDSテーブルに結果を保存していました。リリーストリガーサービスが加盟店ごとのリクエストをパブリッシュしてワーカーにコンシュームさせ、各ワーカーが加盟店ごとに集計していました。そうするとどのような問題が発生するか、勘の鋭い方はもう察しがついているかもしれませんが、「1日の決済回数がすべての加盟店で同一ではない」という点が問題になります。
1日の決済回数が中央値をはるかに超える「ハイトラフィック」なPayPay加盟店(ほとんどが大手国内チェーン)は比較的少数です。これらの加盟店は各々1つのワーカーに割り当てられ、そのワーカーがその加盟店のすべての決済をページングしながらDBを参照し、ローカルメモリ上で合計値を集約します。プロセスにはたくさんのステップがあり、4時間30分以内に全てを完了する必要があるのに対し、最後の合計値の集約に2時間以上かかってしまいます。これは明らかにボトルネックであり、PayPayの急速な拡大に伴い、さらに問題が悪化することが懸念されました。
図1:過去に使用していた日次のリリースプロセスのアーキテクチャ
新たなアーキテクチャ
私たちは改善すべき点が2つあることに気づき、DynamoDBに保存されたデータをSparkで処理するという方法を初めてPayPayに導入することにしました。DynamoDBは、AWSが提供し、広く使われているワイドカラム型NoSQLデータストアであり、特定のユースケースやパーティションキーの正しい選択により、読み取りと書き込みの両方をスケーリングすることができます。決済履歴のような大きなデータを格納するためにDynamoDBを選びましたが、迅速な処理のために日毎の決済を個別に保存するテーブルも別途作成することにしました。そのようにしなかった場合、日付をプライマリーキーとするグローバルセカンダリインデックスが必要となります。これはMapReduceではよく見られる問題で、問題解決になるべく無駄な労力を割かない方法をとりました。
そこで出てくるのがSparkとAmazon EMRです。Sparkの動作について詳細な説明は割愛しますが、SparkはMapReduceのパラダイムに基づき、Hadoopの限界を克服するため多くの改善点を導入しています。私たちが興味を持ったのは、宣言型のインターフェースを使ってネイティブに処理を分割することができる点、分散コンピューティングの細かい部分を省いた分かりやすいコードが得られる点、データソースAPIを使って様々なストレージシステムをサポートする拡張性がある点です。特に2点目が重要であり、理由としてはDynamoDBに接続するための既成のソリューションをSparkが提供していないことと、AWSがSpark用の公式DynamoDBコネクタをサポートしていないことが挙げられます。
図2:日次のリリースプロセスの新アーキテクチャ
注:上の図はSparkのエグゼキューターとDynamoDBのパーティションを1対1でマッピングし、単純化しています。
DynamoDBの最適化
DynamoDBはテーブルを複数のノードに分割して利用するクラスター型のアーキテクチャを採用しているため、1度に1つのパーティションしか利用されず、データを順番に読み込むので無駄が発生します。独立して動作するSparkノードのクラスターと、独立した処理が可能なワークロードが既に存在するので、DynamoDBの分散性を活用するためには、合計処理時間を短縮することが不可欠ですが、DynamoDBは複数のワーカーを使ってテーブル全体をフルスキャンする機能を提供しているので、これを実現することができます。詳細はこちら
スキャンリクエストにセグメントの総数と現在のセグメント番号を加えることで、複数のパーティションに負荷を分散し、テーブルのプロビジョニングされた読み取りスループットを全て利用できます。DynamoDBコネクタを通じて、Sparkクラスター内の各コアにセグメントが1つ以上割り当てられています。これによりテーブルの読み取りキャパシティユニット(RCU)の使用量を飽和させ、スロットルを最小限に抑えることができます。
Sparkの最適化
では、Sparkはどのようにして重い処理に邪魔されず計算を分散させているのでしょうか?私たちがデータセットに対して”partitionBy”を呼び出さない限り、Sparkはヒューリスティクスに基づいて決済を各タスクに分配します。つまり、私たちから見ると、全てのタスクが全ての加盟店の決済を拾うチャンスがあると言えます。これは以前とは真逆のアプローチです。各タスクが”groupBy“に基づいて決済を収集し、Sparkは集約されたデータをエグゼキューター間でシャッフルする最適な方法を決めて、最後にもう一度集約を行い、最終結果をDynamoDBの別のテーブルに保存します。このような分散型の計算パラダイムは「Map-Side Reduction」と呼ばれ、特に”groupBy”を使用した時にいくつかのグループが中央値に比べて大きい場合、実行時間を大幅に最適化できます。
結果分析
先に述べた通り、以前のシステムは大手加盟店の負荷に完全に打ちのめされ、最後に記録した集計完了までの時間は2時間30分弱でした。そこから、PayPayの1日あたりの決済件数はほぼ2倍に拡大し、さらに国内の大手コンビニチェーンの自社アプリにPayPayの決済機能が搭載された結果、加盟店の決済分布の上位値にさらに大きな不均衡が生じるようになりました。現在、コードベースやインフラを簡素化したことでリリースの集計処理にかかる時間は2分33秒となり、約60分の1に短縮しました。図3にSparkのクエリを可視化していますが、アプリケーションの中で”HashAggregate”演算子を呼び出す段階は主に2つあることがわかります。1つ目はMap-Side Reductionで、それに続く”Exchange“ブロックで最終的な削減に備えエグゼキューター間で中間結果をシャッフルします。最初の”HashAggregate“演算子はデータ量を98.2MiBに削減し、エグゼキューター間では約15MiBの転送だけで済むため、数ミリ秒で完了します。2つ目の”HashAggregate“演算子はさらに軽量化され、集約されたデータをDynamoDBのアイテムとして保存する”AppendData“の呼び出し時間を入れても、最も長いタスクで8秒しかかかりません。
図3:Sparkのクエリの可視化
結論
新アーキテクチャを使い始めて7ヶ月が経ちました。この良い結果を受け、ペイアウトサブシステムの他のバッチ処理もSparkに移行することになりましたが、これについてはまた別の機会に。PayPayが直面した他の課題をどのように解決してきたかについても今後ご紹介していきますので、お楽しみに!