LINE Engineering
Blog

BigDB - an ad data pipeline for LINE

Jongkyu Lim, Joonghoon Shin 2017.07.12

Jongkyu Lim: Jongkyu Lim is responsible for processing ads data. Joonghoon Shin: Joonghoon Shin develops ads data platform in LINE.

Before we begin

Hello, we are Jongkyu Lim and Joonghoon Shin responsible for processing of ads data and development of ads platform in LINE.In this blog post, I'd like to talk about BigDB: a big data processing pipeline for LINE ads. I'll go into detail about how BigDB came to be, what BigDB is, how it's structured, what it does, and what use cases it has.

How BigDB came to be

As we gather data from various services that are growing in size, the size of the data grows larger as well, making it difficult to properly use them for analysis. Up until now, we've used several open solutions that support big data to tackle this problem. While using several of these open solutions that have different strengths and weaknesses, we felt the need for a more simple and standardized way to collect, process, and look up data. And that is where the idea for BigDB began. Below are some of the key features we had in mind for BigDB.

Introducing BigDB

What is BigDB

BigDB is a big data processing pipeline for LINE ads. It can collect, reprocess, and look up data. LINE ads are analyzed in two ways: The first is in real-time whenever an ad is shown to a user. The second is batch analysis, where events are collected and analyzed after a set time, such as hourly or daily. BigDB fluidly provides the data used for analysis. Sometimes combining time series and static data if necessary.

Below are some features of BigDB that I'll go into more detail later in the blog post.

  • REST CLI(Command Line Interface)for easier management and support.
  • Flow controlof creating a hiveContext Table with streaming data and static data through the creation and management of schemas, and searching data according to the data type requested by the user.
  • Multi-session support for reading and writing data.
  • Spark SQL-based real-time data joining of user requested data before analysis.
  • Two separate repositories that areflexibly assigned according to the scale of the user-requested data.

Structure

Solution structure

As you can see in the diagram below BigDB adopts many open-source solutions. We originally used all-in-one tools, but those tend to include unnecessary solutions that we rarely or never used. Without these unnecessary solutions using up node resources, we were able to allocate node resources to where they are most needed. From a management standpoint, getting rid of these unnecessary solutions also greatly reduced management overhead. While we were able to compensate for the parts these solutions lacked through customization, but customization often causes dependency problems later on when you need to upgrade or migrate. That's why we developed the BigDB Core and API to act as a control center that can regulate the flow between these different solutions, which allowed us to use the unique features of each solution without running into any major problems.

  • Message Proxy: Uses akka.http to collect, and produce JSON data according to preset Kafka topics.
  • Kafka: Stores collected JSON data for 7 days. Partitions change depending on the number of cores used by streaming.
  • Streaming: Uses Spark to perform tasks in a 5-second interval, adding data frames to the table assigned by the schema as "InsertInto" after a simple data conversion process. Streaming is split into two, the one on the left being reserved for collected data tables, and the one of the right for tables joining original data with the source.
  • HDFS: Compresses collect JSON data and then stores it in Parquet format. The one on the left uses SSDs, and the one on the right uses SATA hard drives. With federation configurations, you can access each HDFS just by their namespace.
  • Spark: Zeppelin, Streaming, and BigDB act separately, using Hive Meta-Store to share tables between sessions. We set it to use resources from the part where the tables (actual data files) are located for locality.
  • End Point: Web, Zeppelin, and BigDB APIs use Spark's resources and tables. The end point on the right is mainly for aggregation and searching, and the one on the left is for performing scheduled tasks and searching.
  • BigDB Core/API: Uses akka.http for creating and managing schemas, performing query-based aggregation, and performing scheduled tasks. Provides a REST CLI using terminus.js. JSON data is converted to CSV/TSV/JSON when the results are sent to the end points.

System structure

We've divided the system into two clusters depending on the type of disk used. We use SSDs for their fast read/write speeds but as they have limited storage, we use SATA hard drives for larger nodes as SATA hard drives have larger storage despite being slower than SSDs. The data node that uses SATA drives is divided into 12 physical disks, to make it as efficient as possible. The two nodes use the same CPU, while we use different racks to save on disk space for the memory. The cluster using SSDs are generally used for long-term storage of small-size data after aggregation, while the cluster using SATA hard drives are used for long-term storage of large files such as raw data, or processed data that is almost as large as raw data. The two clusters were designed with horizontal scalability in mind. We were able to secure more usable memory by using the Parquet format, and any remaining memory is allocated to Elasticsearch so that it can process data needed for searching. The data being searched is usually large in size, so we use a 10Gb network to ensure the performance of shuffle. All solutions are configured to use local connections if possible.

  • Proxy: We minimized the business logic to minimize the resources used, while also ensuring better scalability.
  • Kafka: We've minimized and configured Kafka on the same node as Spark and HDFS for better locality, using it as a buffer for data.
  • Spark & Spark Streaming: Used for lossless data processing from Kafka, ensuring locality through a PreferBroker configuration. We've added data frames as "InsertInto" on Streaming, so that we can share the table data. Rather than caching data frames in-memory, we opted to use the Parquet format to improve performance for queries on each column. We've set up upper limits for memory on the nodes, so that some can be assigned to be used in Spark and some in Elasticsearch.
  • HDFS: By enabling SCR (Short-Circuit Local Reads), we ensured efficiency on local connections. We've made path sharing between clusters easier with federation configurations. The two NameNodes are HA configured in order to deal with outages. The HDFS is divided into a cluster that uses SSDs and a cluster that uses SATA hard drives. We use SSDs for stacking aggregated data as SSDs have fast read/write speeds but limited storage size. We set up 12 non-mirrored SATA hard drives on the other node to ensure better read/write efficiency. As the total storage size is quite large, we expect to have enough storage for up to five years.

More details on the features

REST CLI (Command Line Interface)

With terminus.js, we've added a REST command line interface. You can press the tab key to see a list of available commands, and instructions on how they can be used. You will most likely use this CLI when creating or managing tables (schemas), by registering queries to the scheduler to create new tables. We plan to add more administrator-focused features such as node management later down the line.

Create Table[Schema]

Once you create a schema with the REST CLI, BigDB will read the data from the source assigned on the schema in order to store it on the appropriate type of table. The schema data is kept on ZooKeeper, while Message Proxy and Spark reference the schema for validation and type conversion.

BigDB utilizes the partition data of the table assigned to the schema, and then merges the numerous partition data sent from Streaming in short intervals, daily during the dawn. During testing, we've only encountered performance issues when there were more than 800,000 partitions created. As Streaming creates a partition every five seconds, we were able to circumvent this issue by merging the partitions every day.

Support [Multi] Session

Real-time data inputted on the tables can be used on mutually exclusive Spark clusters, and the data can then be shared through Zeppelin, JDBC, web, and so on. The InsertInto action is the only action that creates partitions during streaming, and it let us check real-time data without running "refresh table" in another search session of Spark.

[Realtime] Data Join

Data Join is a type of analysis. Online analysis, in particular, greatly helped in reducing joining costs during the learning process as it had the capability to join multiple tables in real-time. When time series data that increases in real-time, static metadata, and user data are joined en masse, several inefficiencies can occur due to how the location of data is different. We were able to minimize joining costs by joining data when the size of real-time data is at its smallest.

Support Input/Output Spec ([ ] : Beta Phase)

Supported output types are TSV, CSV, and JSON, which can be used with Table, Kafka, File, Elasticsearch and web. Kafka is set as the default input, but depending on the type, loaded data can be used directly with a HDFS without Kafka. It's possible to define input/output when creating a schema using the BigDB API. When joining after creating a schema, we can set up queries on the scheduler and the intervals at which they will be performed using the BigDB API. We can also set up queries and intervals for aggregation as well. The query result data is sent to the appropriate HDFS depending on the size of the data.
Input Join Target Support Output
Kafka + None = HiveContext Table
HDFS Kafka
Kafka File(TSV, CSV)
[DBMS] None Elastic Search
HDFS For web(TSV, CSV, JSON)
Kafka .

Use Case

Looking up and analyzing data periodically

Internal users such as advertising and planning department employees mostly analyze and report on data divided by certain periods. Any commonly used data is displayed on the dashboard, which can be accessed through the web.
  • Input: Kafka (Realtime Advertisement Impression & Click Log Data)
  • Join Target: None
  • Support Output: JSON (For Dashboard web page)
  • Purpose: For adding 5-second interval real-time ad log messages to tables assigned by the created schema, which are added using the InsertInto command, and then aggregated by creating a new aggregation table from accumulated real-time data aggregation queries and their intervals.
  • Performance: We were able to store 10,000 to 100,000 instances of storing data in under one second, every five seconds. And we were able to search through three months worth of accumulated data in a matter of seconds.

We can access data periodically aggregated with the BigDB API on the dashboard webpage, in JSON format. The size of the data is greatly reduced as it's already aggregated, and we were able to display it on the web in a few seconds or under a minute. The dashboard is used for ad-hoc queries as well as result searching through machine learning.

Ad-hoc querying

Internal users such as advertising, planning, and development department employees, search data that they need for analysis by performing an ad-hoc query on the tables created by BigDB. Our engineers also use UDF alongside ad-hoc queries to handle more complex business logic. This is mostly done using Zeppelin.
  • Input: Kafka (Realtime Advertisement Impression & Click Log Data)
  • Join Target: None
  • Support Output: Table (For Zeppelin)
  • Purpose: For creating schemas for data inputted in real-time and then accumulating the data on a table so that we can use Zeppelin to send ad-hoc queries. We can also register preset queries and accumulate the results on the aggregate table, so that we can send ad-hoc queries on the aggregate data.
  • Performance: We were able to store 10,000 to 100,000 instances of storing data in under one second, every five seconds. And we were able to perform queries in a matter of seconds, depending on the type. As this uses up resources during the process, we sometimes perform real-time joining and then reprocess the data when necessary.

Accumulated data is differentiated with unique keys when we accumulate partition keys, so that the data can be used in multiple sessions without having to refresh the table. The number of partition keys increase when reducing the streaming interval, so we performed a daily merge to circumvent any potential problems that could be caused by increased partitions. We increased search efficiency by only accessing the large-sized raw data only for administrative reasons, and by performing aggregations only when necessary.

Role as an online data joiner for analysis and prediction (beta phase)

We try various methods to provide our users with more useful ads, such as collecting events in real-time and then analyzing them. The large amount of data that we encounter during this process is created into a table through BigDB. We increased analysis efficiency by handling the joining process in real-time, as it's one of the most time-consuming processes that occur during analysis.
  • Input: Kafka (Realtime Advertisement Impression & Click Log Data)
  • Join Target: HDFS (Daily User Demo Data)
  • Support Output: Kafka & Table
  • Purpose: For reducing joining costs during analysis by performing a real-time joining of 5-second interval real-time ad log messages and daily aggregate user demo data, which contributes the online ad CTR (click-through rate).
  • Performance: We were able to store 10,000 to 100,000 instances of storing data in under one second, every five seconds. It took us two to three seconds when joining the streaming data with another 70,000,000 instances of data.

The cost of joining can increase when joining data collected daily or hourly, as large amounts of data are shuffled. We also wanted to ensure high quality analysis for our services, by using the data in real-time analysis rather than batch analysis. Users can expect to receive information more relevant to them as a result.

Future plans

We've gone over how BigDB was conceived, how it's structured, and what it does. Next, I'd like to talk about some things that we've improved.

  • Improved data collection schema
    • BigDB works without a schema during data collection. You can assign schema types and repository locations when tables are created from the collected data.
  • Better processing and reprocessing of collected data
    • We adopted Spark SQL to process data on BigDB, and created UDFs (user-defined functions) to handle more complicated logic.
  • Requesting different types when looking up data
    • BigDB uses Spark SQL when searching tables, and can search inside TSV/CSV file types. You can also use BigDB on a webpage as we added TSV/CSV/JSON data search capabilities using REST API.
  • Reduced data joining costs
    • BigDB offers a way to join real-time data in real-time, and is horizontally scalable.

Below are some of our future plans on how we will be using BigDB.

  • Expansion: Support for DBMS cloning
    • We plan to allow cloning tables that are too large for analysis on DBMSs such as MySQL, Oracle, and CUBRID to be cloned on the HDFS in advance so that they can be processed into an analyzable state. Since small-sized data is also frequently used in joining, we are planning to allow cloning all tables on a DB to the HDFS and then converting it to a table format similar to the DB. We are also planning to allow limited appending of incremental data using status info such as "update" or "delete," so that the latest status can be searched. Currently only "insert" is supported.
  • Usability: Providing the CLI in a GUI
    • We're planning to improve usability of the terminus.js REST CLI by presenting it through a GUI. You'll be able to use all the commands available on the CLI on the new GUI.
  • Cluster management: Adding more cluster management features such as installation and administration
    • Currently you need to download, install, and configure resources that match the type and version of the solution you're using from the shell script. By adding cluster management features we plan to let our users use the REST CLI to install and manage clusters, check cluster package install history, change settings and so on.

I'd like to thank you all for reading through this post, and showing interest towards BigDB. We hope to bring you many more interesting updates in the future.

Big-Database Big-DB Data-pipeline

Jongkyu Lim, Joonghoon Shin 2017.07.12

Jongkyu Lim: Jongkyu Lim is responsible for processing ads data. Joonghoon Shin: Joonghoon Shin develops ads data platform in LINE.

Add this entry to Hatena bookmark

Back to blog list