Paypay uses different data storage solutions in it’s microservices, such as SQL Database, NewSql Database, KV Database, File system, etc. For this purpose, the DaaS (Data as a Service) team is responsible for building DWH to provide integrated data for users to perform data analysis.
We built a data extraction application for different data sources by using Spark, which is a batch application, and like many traditional approaches we pull data incrementally, deduplicate the data, and write to aws s3 based on the timestamp. Then we upload data to google bigquery after data regularization. With this approach we have to create a RO(read-only) replica instance in each database cluster to provide data extraction and avoid the performance impact on the RW(read-write) instance. In the early days when the data volume was small this solution solved almost all of our use cases.
As the data volume increased, the binlog synchronization between the RO replica instances and the RW instances became more frequent and slower, which caused the timestamp-based extraction to become less reliable. We had to spend a lot of time and effort on fixing the synchronization problems of the database. And we also had to spend a lot of time and effort on fixing the contemporaneous problem of databases and the data consistency problem within DWH and databases. At the same time, by using this approach to read data, we could not identify the data that had been deleted, which had a considerable impact on the data analysis afterwards.
In order to solve and improve the above problems, we decided to build a incremental updates data pipeline by reading the binlog generated by the rw instance. Because of the seriality and atomicity of the binlog, we no longer need to care about the data inconsistency caused by the timestamp on the data layer, and secondly, because the binlog is directly generated by the RW instance, we do not rely on using RO replica to get the data, which makes data acquisition simpler and more efficient. In addition, the binlog records deletion operations against rows, which lets us know what data has been deleted from the data source.
We compared Delta DataLake and Apache Hudi, which both support Hadoop-based CDC incremental building ACID data solutions. Since we preferred to build a write-optimized architecture, Apache Hudi, which supports write-optimized mode aka MOR (Merge On Read), became our final choice.
The basic workflow of Hudi can simply summarized with the following
- Data ingest
- Calculate index and generate base file relations
- Generate parquet and avro files
- Compaction and clean
1. Data ingest
We use Apache kafka as the message queue for the binlog, and we ensure that the data with the same primary key will always be in the same kafka partition, ensuring that the data order is trusted at the partition level. We use at least once mode in the design for performance, and solve the data duplication problem in the subsequent data processing.
We use kafka’s offset and the sequence information in the binlog event to determine the final data to be written to Hudi, this step ensures that the data will not be duplicated before we write to Hudi.
3. Calculate index and generate relations with base files
Hudi will generate index based on user-defined record key and partition fields, we can get the partition position by partition fields, which is similar to Apache Hive, due to the nature of bloom filter, due to the nature of the bloom filter, by using the index we can simply determine which file slice of the partition the data needs to be saved to.
4. Generate parquet and avro files
Binlog events contains Insert, Update, Delete three kinds of operations, in order to distinguish delete operation in the subsequent data pipeline operations we only consider Insert and Update, Delete operation will be converted to set the value of specified field of the internal column ‘is_deleted’ to true, so this will also become an Update operation. We generally use upsert operation, it will pass index (bloom filter) to determine whether the data is a new insert, when yes it will be written to a new parquet file, conversely it will generate an avro file to the same file slice. In order to record the operations against the data, Hudi provides a complete timeline design, each write operation will be recorded in the timeline.
5. Compaction and cleaning
This step is most important in Hudi, since although Hudi provides a snapshot query method that supports avro and parquet reads together, the reads consume performance in de-duplication, so compacting avro to parquet can provide better query performance. Hudi uses a timeline to figure out which file slice needs to be compacted, not only as an avro to store update data, but also to merge small parquet files into a new parquet file of a certain size. Since we are using MOR mode, for reducing the performance issue caused by compaction, we set the compaction operation to be performed after every N write operations.
After the compaction operation, Hudi will clean up the files, it will delete the files which are not used anymore. operations of cleaning will be recorded by Hudi timeline as well.
In the actual operation process, how to migrate the existing data to Hudi-based DWH is also an important point, because there are many historical problems with the previous data pipeline, we finally used JDBC on Spark and could platform-based data extract function to re-migrate all the data.
When we decided to use Apache Hudi, Hudi was still incubating and we encountered a lot of problems, but we solved most of them by communicating with the Hudi community and contributing code for some of the features. This helped us to understand the workflow of Hudi and how to improve our programming.
Through the DaaS team’s efforts, we successfully ran 400+ data tables from Filesystem, Mysql, TiDB and DynamoDB, running on a Hudi-based data pipeline. Since we used Spark Structured Streaming to run micro batches one at a time, we set the job interval to 5 minutes by default, and most of the micro batches were able to complete all operations from ingestion to compaction within the specified time. This allows us to reduce the data latency from hours to minutes, which is a leap forward.
Hudi natively supports data queries from Apache Presto, Apache Spark, Apache Hive, etc., which makes it easy for other groups to use our data, and also starting 2020.09, AWS Athena also starts to support Hudi’s data queries natively, just by using AWS Glue or a simple You only need to use AWS Glue or simple schema sync code to display and use Hudi tables in Athena.