As our product becomes more popular, the number of transactions that we handle daily keeps increasing. One of the services at PayPay that is affected by this rapid rise in the number of transactions is the payout subsystem. Payout subsystem covers a wide range of processes from calculating commissions for transactions, cancellations, refunds for a variety of merchants with different requirements to calculating total amounts for internal wallet movements to generating a variety of reports for accounting and reconciliation. Most of these operations are run as batches and they are highly parallelizable thanks to being dependent on high-cardinality terms such as merchant IDs or even transaction IDs. Nonetheless, we were facing scalability issues with our old system particularly in two operations: aggregation of the transaction,
commission, tax amounts per merchant, and generation of reconciliation reports for the merchants requesting them. In this blog post, we will be focusing on the former, leaving the latter for another post in the future.
An analysis of the scalability issues
The calculation of the total transaction, commission, and tax amounts daily is called the release process. In the old payout system, we were relying on Kafka’s partitioning to distribute the load across many worker nodes that would be calculating the totals and saving the results to a table in Amazon RDS. The release trigger service would push requests per each merchant where they would be consumed by workers and the aggregation would be performed per merchant by each worker. You may already start to imagine what kind of issues this could entail. The problem is that not all merchants are equal in terms of the number of daily transactions. There are a few high traffic merchants that yield daily transactions way over the median number across all merchants. These merchants would each get assigned to a single worker which was now responsible for making DB lookups by paging over all of their transactions and aggregating the totals locally in memory. For the tail end, this would end up taking upwards of 2 hours while we are allowed 4 hours and 30 minutes to complete the entire process which includes numerous other steps. It was clearly a bottleneck and was only going to get worse with the rapid expansion of the service in Japan with more transactions incoming every passing day.
Figure 1: Our previous architecture for daily release calculations
The new architecture
We saw two points for improvement and brought in a company-first use of processing data stored in DynamoDB with Spark. DynamoDB is the venerable wide-column NoSQL datastore from AWS that is capable of scaling both reads and writes well with certain use cases and correct selection of partition keys. We chose DynamoDB to store data that is particularly large like transaction records but also we opted to store transactions each day to separate tables for quick processing which would otherwise necessitate a global secondary index with the date as
the primary key. As for the issue described above, it is a very common MapReduce problem and we chose not to reinvent the wheel to solve it:
Enter Spark and Amazon EMR. While a detailed description of how Spark operates under the hood is out of the scope of this article, it has its foundations on the MapReduce paradigm and introduces a multitude of improvements on Hadoop to overcome some of its limitations. The thing we were interested in was its ability to partition operations natively using a declarative interface, resulting in expressive code that does not deal with the nitty-gritty of distributed computation, and its extensibility to support different storage systems through its data source API. The second point is important because Spark does not provide an out-of-the-box solution for connecting to DynamoDB and neither does AWS support an official DynamoDB connector for Spark.
Figure 2: Our new architecture for daily release calculations.
Note that in the above diagram, one-to-one mapping between the Spark executors and the DynamoDB partitions is a simplification.
As DynamoDB has a clustered architecture where tables are partitioned across multiple nodes, it is wasteful to read data in a sequential manner as only one partition will be utilized at a time. As we already have a cluster of Spark nodes that can work independently and a workload that allows such independent processing, it is essential to reduce our total processing time to make use of the distributed nature of DynamoDB. DynamoDB supports this idea via providing the capability to make a full table-wide scan using multiple workers. (Details are described here.) By including the total number of segments and the current segment number in the scan request, it is possible to distribute the load across multiple partitions and utilize the total provisioned read throughput of a table. Through our DynamoDB connector, we assign one or more segments per each core in the Spark cluster. This allows us to saturate our usage of the table’s read capacity units (RCUs) while getting throttled as little as possible.
So, how does Spark distribute the computation such that it does not get bottlenecked with the heavy hitters? The way Spark distributes the transactions across each task is based on heuristics unless we take the reins and call “partitionBy” on our dataset. This means from our perspective, every task has a chance of picking up transactions of any merchant. This is exactly the opposite of our previous approach. We let each task to collect transactions based on our “groupBy” directive and let Spark determine the best way to shuffle the aggregated data across executors, finally doing a second aggregation and saving the final results to a different table in DynamoDB. This distributed computation paradigm is called Map-Side Reduction and provides drastic optimization of execution time particularly when using “groupBy” function when some of the groups tend to be much larger compared to the median size.
An analysis of the results
As discussed earlier, the existing system was completely getting crushed by the load from the top merchants, the last logged time to complete aggregation is a little shy of 2 hours and 30 minutes. From that point, we almost doubled the daily transactions and got one of the major convenience store chains in Japan to accept PayPay payments through their own app, resulting in even a higher skew at the high end of the transaction distribution across merchants. Currently, the release aggregation process takes 2 minutes and 33 seconds, a reduction of
roughly 60 times, all the while simplifying our codebase and infrastructure. In figure 3, the query visualization of Spark is shown. We can see that there are two main stages in the application where “HashAggregate” operators are invoked. The first one of these is Map-Side Reduction and it is followed by an “Exchange” block which shuffles the intermediary results between the executors to prepare for the final reduction. The first “HashAggregate” operator reduced the data volume to 98.2 MiB where only about 15 MiB’s needed to be transferred across executors which was completed in single-digit milliseconds. Furthermore, the second “HashAggregate” operator is much lighter with the longest running task taking 8 seconds including the “AppendData” invocation which saves the aggregated data as DynamoDB items.
Figure 3: Query visualization of the Spark application
We have been running this new architecture in production for over 7 months. The results encouraged us to move other batch processing parts of the payout subsystem to Spark as well which we reserve for a separate discussion in the future. For that next chapter and our solutions to other challenges we faced, stay tuned.