LINE Engineering
Blog

LINE 광고 데이터 파이프라인 BigDB

임종규, 신중훈 2017.07.12

임종규: LINE에서 광고 데이터를 처리하고 있습니다. 신중훈: LINE에서 광고 데이터 플랫폼을 개발하고 있습니다.

시작하기 전에

안녕하세요, LINE에서 광고 데이터 처리 및 광고 플랫폼 개발 업무를 담당하고 있는 임종규, 신중훈입니다. 이번 블로그에서는 LINE 광고의 빅데이터를 처리하는 플랫폼인 BigDB를 구현하게 된 배경과 BigDB의 정의, 구조, 기능, Use case를 소개하고자 합니다.

BigDB를 구현하게 된 배경

여러 서비스의 DB 데이터가 서비스의 규모가 커짐에 따라서 분석 업무에 활용하기 어려워지고, 이를 해결하는 방안으로 빅데이터를 지원하는 여러 오픈 솔루션을 사용해 왔습니다. 서로 장단점을 가진 여러 형태의 오픈 솔루션을 업무의 요건에 따라 적절히 사용하다 보면, 상황에 따라서 조금씩 아쉽거나 부족한 부분이 발생하기 마련입니다. 다양한 업무의 데이터 처리를 경험해 오면서, 데이터 처리의 업무를 공통화하기 위해서는 데이터의 수집/가공/재가공/조회 기능을 가능한 한 간편하고 일반적인 형태로 제공하는 것이 좋겠다고 판단했습니다.

BigDB 소개

BigDB란 무엇인가

BigDB는 LINE 광고의 빅데이터 처리 파이프라인으로, 데이터 수집, 가공, 재가공, 조회 등의 기능이 있습니다. LINE 광고의 분석 형태는 사용자가 광고에 노출이 되었을 경우 해당 이벤트를 받아서 즉시 처리하는 실시간 분석과 이벤트를 받아서 1시간이나 1일, 등록한 시간마다 처리하는 batch 분석으로 구분되어 있습니다. BigDB는 분석을 위해 데이터를 제공하고, 필요한 경우 시계열 데이터와 정적인 데이터를 Join하여 제공하는 등 데이터를 유연하게 제공할 수 있습니다.

BigDB에서 제공하는 기능을 간단하게 소개하고 자세한 설명은 아래 기능 상세 소개에서 설명하겠습니다.

  • 편리한 운영을 지원하기 위해 REST CLI(Command Line Interface) 기능을 제공합니다.
  • 스키마의 생성과 관리를 통해 스트리밍 데이터와 정적인 데이터를 hiveContext Table로 생성하는 흐름을 제어하고, 사용자 요청의 데이터 형식을 지정하여 데이터를 조회하는 기능을 제공합니다.
  • 데이터 읽기, 쓰기를 위한 멀티세션을 지원합니다.
  • 분석 전에 분석에 필요한 사용자 요청의 데이터를 Spark SQL 기반으로 실시간 데이터 Join 기능을 제공합니다.
  • 저장소를 크게 두 개로 구분하고, 사용자 요청의 가공 데이터를 스케일에 맞는 저장소를 지정하여 가용성을 확보할 수 있는 구조를 제공합니다.

구조

솔루션 구조

BigDB에서는 아래와 같이 다양한 오픈 소스를 활용하고 있습니다. 이전에는 통합되어 있는 툴을 사용하고 있었으나, 통합된 툴의 경우에는 사용하지 않는 별도의 솔루션을 많이 포함하고 있었습니다. 이런 솔루션이 노드의 자원을 일부 사용하기 때문에 최소한의 꼭 필요한 솔루션으로만 구성함으로써 노드의 제한된 자원을 좀 더 필요한 곳에 할당할 수 있게 되었고, 컨트롤해야 하는 운영의 측면에서도 부담을 덜 수 있었습니다. 한편으로는 각 솔루션에서 업무에 부족한 부분을 사용자 정의로 변경할 수 있었으나, 그 경우 업그레이드나 마이그레이션할 때 의존성 문제가 자주 발생합니다. 그렇기 때문에 최대한 변경은 없게 하고, 솔루션 간의 흐름에 대한 컨트롤 역할을 별도의 BigDB Core와 API로 개발함으로써 특징적인 기능에 대해서는 큰 문제 없이 다룰 수 있게 되었습니다.

  • Message Proxy: akka.http를 사용하여 JSON 데이터를 수집하고, Kafka의 정해진 Topic에 produce합니다.
  • Kafka: 수집된 JSON 데이터를 7일간 보관하며, Partition은 스트리밍에서 사용하는 코어의 개수에 따라 변경됩니다.
  • Streaming: Spark을 사용하며, 5초 주기로 Task를 수행하고 간단한 데이터의 변환 작업 후 스키마에서 지정한 Table에 Data Frame을 InsertInto로 추가하게 됩니다. Streaming이 두 개로 나누어져 있고, 왼쪽은 집계 데이터를 위한 Table, 오른쪽은 원본 데이터와 원본과의 Join 데이터를 위한 Table을 다루게 됩니다.
  • HDFS: 수집된 JSON 데이터를 압축한 후 Parquet 형식으로 저장하게 됩니다. 왼쪽은 SSD 디스크를, 오른쪽은 SATA 디스크를 사용하고 있습니다. Federation 설정으로 각각의 HDFS는 Namespace만으로 편리하게 접근할 수 있습니다.
  • Spark: Zeppelin, Streaming, BigDB 각각 별도의 세션으로 동작을 하며, Hive Meta-Store를 사용하여 Table은 세션 간에 공유됩니다. Locality를 위해서 Table(실제 데이터 파일)이 존재하는 파트의 자원을 사용하게 됩니다.
  • End Point: Web/Zeppelin/BigDB API 등에서 Spark의 자원과 Table을 사용하게 됩니다. 오른쪽에서는 주로 집계와 조회를 하고, 왼쪽에서는 조회와 스케줄링된 작업을 수행하는 용도로 사용됩니다.
  • BigDB Core/API: akka.http를 사용하고 있습니다. 스키마의 생성과 관리, 쿼리 기반의 집계나 스케줄링된 작업의 수행을 관리합니다. terminus.js를 사용하여 REST CLI를 제공합니다. End Point에 결과를 전달하기 위해서 JSON 데이터를 CSV/TSV/JSON 형태로 변환합니다.

시스템 구조

디스크의 종류에 따라 두 개의 클러스터로 구분하여 사용하였습니다. SSD의 경우는 읽기/쓰기가 빠르고 용량이 적은 경우에 사용하고, SATA의 경우는 읽기/쓰기가 SSD에 비해서 느리지만 용량이 큰 노드일 때 사용합니다. SATA를 사용하는 데이터 노드의 경우는 물리적으로 12개의 디스크를 분산해서 사용함으로써 효율성을 확보하고 있습니다. CPU의 경우는 동일하게 구성하였고, Memory의 경우는 디스크의 공간을 고려한 Rack 구성으로 차이 나게 구성하였습니다.
SSD를 사용하는 클러스터는 일반적으로 집계 후의 작은 사이즈를 장기간 보관하는 용도로 사용하며, SATA를 사용하는 클러스터는 원본데이터와 원본데이터 수준으로 가공되는 큰 사이즈를 장기간 보관하는 용도로 사용합니다. 두 개의 클러스터가 모두 수평 확장을 고려하고 있습니다. Memory의 경우는 Parquet를 사용하게 되면서 사용성이 적어지고 있고, 남는 Memory를 Elasticsearch에 일부 할당함으로써 검색이 필요한 데이터 처리에 활용하고 있습니다. 조회 시 대상이 되는 데이터의 사이즈가 크고, 그에 따라서 셔플의 성능을 확보하기 위해 네트워크는 10GB를 사용하고 있습니다. 사용되는 솔루션 모두 가능한 로컬통신을 할 수 있도록 설정하고 있습니다.

  • Proxy: 비즈니스 로직을 최소화해서, 자원의 사용도 최소화하고 스케일의 확장을 쉽게 할 수 있도록 구성합니다.
  • Kafka: Locality 확보를 위해서 Spark, HDFS와 같은 노드에 최소화해서 구성을 하고, 데이터의 버퍼 용도로 사용합니다.
  • Spark & Spark Streaming: Kafka로부터 유실 없는 데이터 처리를 하고, PreferBroker 설정을 통해서 Locality를 확보합니다. Table 데이터 공유를 위해서 Streaming에서는 Data Frame을 InsertInto로 추가해서 사용합니다. Data Frame을 in-memory로 Cache하는 것보다는, Parquet 형식을 사용하여 컬럼별로 쿼리의 성능을 향상시킵니다. Memory는 Spark의 내부에서 사용하는 용도와 검색 용도로, 짧은 기간의 데이터를 저장하는 Elasticsearch에 할당하도록 노드에 구성할 수 있는 최대량을 증설해 둡니다.
  • HDFS: SCR(Short-Circuit Local Reads) 설정을 활성화시켜서 로컬 통신의 효율을 고려합니다. Federation 설정을 통해서는 클러스터 간의 데이터 Path 공유를 편리하게 합니다. 두 개의 NameNode는 HA 구성을 함으로써 장애에 대응합니다. HDFS의 경우 SSD를 사용하는 파트와 SATA를 사용하는 파트로 두 개의 클러스터를 운영합니다. SSD의 경우 쓰기와 읽기의 성능이 우수하지만, 전체 용량은 한계가 있어서 집계 데이터를 쌓아서 활용하고, SATA의 경우 노드별로 물리적으로 12개의 디스크를 no-mirror로 사용함으로써 쓰기와 읽기면에서의 효율을 고려하였습니다. 전체 용량이 크기 때문에 향후 5년 간의 데이터를 유지할 수 있도록 설계했습니다.

기능 상세 소개

REST CLI (Command Line Interface)

terminus.js를 사용하여, REST 방식으로 Command를 운용할 수 있도록 하였습니다. Tab을 누르면 사용할 수 있는 Command Set이 나열되고, 각 Command의 간단한 매뉴얼을 나타냅니다. 운영 상 Table(Schema)을 생성하거나 관리할 때 사용하게 되고, 스케줄러에 쿼리를 등록하여 새로운 Table을 생성할 수 있습니다. 주로 관리자 측면의 기능을 사용하고 있으며 향후 Node의 관리 역할도 추가해 나갈 예정입니다.

Create Table[Schema]

REST CLI를 통해 스키마를 생성하면, 스키마에서 지정된 Source 위치로부터 Data를 읽어서 생성한 Table에 스키마의 형식에 맞게 저장합니다. 스키마의 정보는 Zookeeper 내에 보관하고 있고, Message Proxy와 Spark에서 스키마를 참조하여 Validation 및 형변환을 하게 됩니다. 스키마에 지정된 Table의 파티션 정보를 활용하고, 스트리밍의 짧은 간격으로 입력된 많은 파티션 정보는 매일 새벽 시간에 파티션을 merge합니다. 실제 테스트 시 80만 개 정도의 파티션이 생성되면 성능 문제가 발생했습니다. 스트리밍이 5초 간격으로 파티션을 생성하기 때문에 매일 merge를 수행함으로써 이러한 이슈를 피할 수 있었습니다.

Support [Multi] Session

Table에 입력된 실시간 데이터는 서로 다른 Spark 클러스터에서 바로 사용할 수 있었고, 이를 통해서 입력된 데이터를 Zeppelin, JDBC, Web 등에서 서로 공유할 수 있습니다. InsertInto 동작에서는 스트리밍 시 파티션을 유일하게 생성하는 것으로, 다른 세션의 Spark에서 조회할 때 refresh table을 수행하지 않고도 입력된 실시간 데이터를 사용할 수 있었습니다.

[Realtime] Data Join

Data Join은 분석의 한 형태로 Online 분석의 경우, 실시간으로 여러 Table을 Join하는 기능을 제공함으로써 학습 시 Join하는 큰 비용을 줄일 수 있었습니다. 실시간으로 증가하는 시계열 데이터와 정적으로 제공되는 메타 데이터 및 사용자 데이터를 대량으로 Join하는 경우 데이터의 위치가 서로 다른 이유로 많은 비효율적인 이슈가 발생하게 됩니다. 이 경우 실시간으로 입력되는 데이터의 사이즈가 가장 작은 시점에 Join하게 함으로써, Join 비용을 최소화할 수 있었습니다.

Support Input/Output Spec ([ ] : Beta Phase)

지원하는 Output의 형태는 Table, Kafka, File, Elasticsearch와 Web에서 사용하기 위한 TSV, CSV, JSON 형태입니다. 기본적으로는 Kafka가 Input이 되며, 형태에 따라 Kafka 없이 HDFS에 바로 적재된 데이터를 사용하기도 합니다. BigDB API를 이용해서 스키마를 생성할 때 Input/Output은 정의할 수 있게 되어 있습니다. Join의 경우는 스키마 생성 이후 BigDB API를 이용해서 스케줄러에 쿼리와 수행 간격을 등록할 수 있습니다. 집계의 경우도 스케줄러에 쿼리와 수행 간격을 등록할 수 있습니다. 쿼리 결과 데이터 양에 따라 두 개의 HDFS 중에서 적절한 곳을 선택할 수 있습니다.
Input Join Target Support Output
Kafka + None = HiveContext Table
HDFS Kafka
Kafka File(TSV, CSV)
[DBMS] None Elasticsearch
HDFS For Web(TSV, CSV, JSON)
Kafka .

Use Case

기간별 데이터 조회 및 분석

광고사업 부서나 기획 부서 등의 회사 내 사용자의 경우, 주로 기간별 데이터 조회를 통한 분석과 보고 업무를 하고 있습니다. 공통으로 사용하게 되는 집계 데이터는 별도로 Dashboard를 Web으로 구성하여 사용하고 있습니다.
  • Input: Kafka (Realtime Advertisement Impression & Click Log Data)
  • Join Target: None
  • Support Output: JSON (For Dashboard web page)
  • 목적: 5초 주기의 실시간 광고 로그 메시지를 생성된 스키마에 따라 지정된 Table에 InsertInto로 추가하고 누적된 실시간 데이터를 등록한 집계 쿼리와 수행 간격에 따라 새로운 집계 Table을 생성하고 집계를 수행합니다.
  • 성능: 5초마다 10,000건~100,000건의 스트리밍 데이터를 1초 이하로 저장할 수 있었고, 3개월 동안의 누적 데이터 조회는 수 초 이내로 수행할 수 있었습니다.
  • Dashboard web 페이지에서는 BigDB API를 통해서 주기적으로 집계된 데이터를 JSON 형태로 받아서 사용합니다. 집계된 데이터이기 때문에 사이즈도 많이 줄어들고 기간에 따라 수 초에서 수십 초 사이로 web에 표시할 수 있었습니다. Dashboard는 간단하게 ad-hoc 쿼리를 사용하는 경우와 machine learning을 통한 결과 조회 등으로 구분하고 있습니다.

ad-hoc 쿼리

광고사업 부서나 기획, 개발 부서 등의 회사 내 사용자의 경우, 업무적으로 발생하는 분석 작업을 위해 BigDB로 만들어진 Table에 ad-hoc 쿼리를 수행하여 조회하고 있습니다. 개발 부서 사용자의 경우는 좀 더 복잡한 비즈니스 로직을 처리하기 위해 ad-hoc 쿼리 및 UDF를 추가적으로 개발하여 사용하기도 합니다. 이를 위해서 주로 Zeppelin을 사용하고 있습니다.
  • Input: Kafka (Realtime Advertisement Impression & Click Log Data)
  • Join Target: None
  • Support Output: Table (For Zeppelin)
  • 목적: 실시간으로 입력되는 데이터의 스키마를 생성하고 지정된 Table에 누적함으로써 Zeppelin을 활용하여 ad-hoc 쿼리를 할 수 있습니다. 또한 정해진 쿼리를 등록하고 집계 Table에 결과를 누적함으로써 집계에 대한 데이터의 ad-hoc 쿼리를 할 수 있습니다.
  • 성능: 5초마다 10,000건~100,000건의 스트리밍 데이터를 1초 이하로 저장할 수 있었고, 집계 쿼리의 유형에 따라 수 초 이내로 수행할 수 있었습니다. 수행 시에는 자원을 점유하기 때문에 필요한 경우 실시간 Join을 통해서 가공 후 재처리를 하게 됩니다.

한번 누적한 데이터는 여러 세션에서 refresh table을 수행하지 않고도 바로 사용할 수 있도록 파티션 키를 누적할 때 유일한 키를 사용해서 구분하고, 스트리밍 간격이 짧아짐에 따라 파티션 키가 많아지는데, 이것은 매일 한 번씩 merge함으로써 파티션의 증가를 해결했습니다. 원본 데이터는 사이즈가 크기 때문에 운영상 필요한 경우만 접근하고 되도록 업무상 필요한 집계를 수행하도록 하여 결과 조회의 효율을 높였습니다.

분석 및 예측을 위한 Online 데이터 Joiner 역할 (beta phase)

사용자에게 좀 더 유익한 광고를 전달하기 위해서, 실시간으로 이벤트를 수집하고 분석하여 온라인으로 분석된 광고를 제공하기 위한 많은 시도를 하고 있습니다. 이 경우 대량의 데이터를 처리하기 위하여 발생하는 여러 가지의 데이터 정보를 BigDB를 사용하여 Table로 만들고 있으며, 분석 작업 시 시간이 많이 소요되는 Join 과정을 실시간으로 처리함으로써 분석에 대한 효율을 높이고 있습니다.
  • Input: Kafka (Realtime Advertisement Impression & Click Log Data)
  • Join Target: HDFS (Daily User Demo Data)
  • Support Output: Kafka & Table
  • 목적: 5초 주기의 실시간 광고 로그 메시지와 일별 집계된 사용자의 데모 정보를 실시간으로 Join함으로써 분석 시 Join 비용을 줄이고, 온라인으로 광고에 대한 CTR(click-through rate)을 높이는 분석을 수행합니다.
  • 성능: 5초마다 10,000건~100,000건의 스트리밍 데이터를 1초 이하의 시간 동안 저장할 수 있었고 해당 스트리밍 데이터와 7천 만건 데이터의 Join 시 2초~3초가 소요되었습니다.

일별 또는 시간별로 데이터를 Join하는 경우 대량 데이터의 셔플 발생으로 인해 Join에 대한 비용이 커집니다. 또한 batch 분석보다는 광고의 실시간 분석에 바로 사용함으로써 서비스의 질적 분석에 대한 성과를 확보하고자 했습니다. 사용자 입장에서는 좀 더 필요한 정보를 광고를 통해 제공받을 수 있다는 장점도 있습니다.

마무리와 향후 계획

앞서 BigDB를 개발하게 된 배경과 구조, 기능에 대해서 말씀드렸습니다. 이번에 개선한 기능을 아래에 간단하게 요약해봤습니다.

  • 데이터 수집 시 스키마 개선
    • BigDB는 수집을 할 때는 스키마의 형태가 없고, 수집 후 Table을 생성하는 시점에 스키마의 형태와 저장소의 위치 등을 지정할 수 있습니다.
  • 수집 데이터의 가공 및 재처리
    • BigDB는 Spark SQL을 사용하여 수집된 데이터를 가공하도록 하였고, 복잡한 로직은 UDF(User-Defined Functions)를 생성하여 가공할 수 있습니다.
  • 조회 시 다양한 형태의 데이터 요청
    • BigDB는 Table을 조회할 때 Spark SQL을 사용하거나, TSV/CSV와 같은 파일 형식에 대한 조회 기능을 제공합니다. 그리고 web 페이지에서 사용할 수 있도록 REST API를 통해 TSV/CSV/JSON 형태로 사용자 요청의 데이터에 대한 조회 기능을 제공합니다.
  • 데이터 Join 비용의 절감
    • BigDB는 실시간 데이터의 경우 실시간으로 Join하는 방법을 제공하고, 수평 확장을 통해서 스케일 아웃이 가능합니다.

또한, 앞으로 BigDB를 활용할 계획을 항목으로 나눠서 정리해봤습니다.

  • 확장: DBMS로부터 복제 지원
    • MySQL, Oracle, CUBRID와 같은 DBMS의 분석에 활용하기 어려운 사이즈의 테이블인 경우, 미리 HDFS에 복제해서 분석할 수 있는 상태를 만들도록 확장할 계획입니다. 또한 작은 사이즈의 데이터라도 자주 Join에 활용되기 때문에 DB의 전체 테이블을 미리 HDFS에 복제하고 이를 DB와 비슷한 Table의 형태로 제공하려고 합니다. Incremental 데이터의 경우는 제한적이긴 하지만 update, delete 등의 상태값을 활용하여 append함으로써 최신 버전의 데이터 상태를 조회하는 방식으로 응용할 계획이 있습니다. 현재는 insert 경우에만 지원합니다.
  • 편의성: CLI를 UI로 제공
    • terminus.js를 사용하여 제공 중인 REST CLI를 UI로 제공함으로써 운영상의 편의성을 추가할 계획입니다. CLI에서 제공되는 명령어 세트도 UI로 제공할 계획입니다.
  • 클러스터 관리: Cluster Manager 기능(설치/운영) 추가
    • 현재는 셸 스크립트에서 부트스트랩 서버로부터 솔루션의 종류와 버전에 해당하는 리소스를 다운로드해 설치, 설정하도록 되어 있습니다. Cluster Manager 기능을 추가함으로써 REST CLI를 활용하여 클러스터에 설치 및 관리할 수 있게 하고, 클러스터의 설치된 패키지 내역의 확인과 설정 변경 등을 가능하도록 할 계획입니다.

긴 글 읽어주셔서 감사드리며, 앞으로도 LINE 광고 데이터 파이프라인 BigDB에 대해 많은 관심 가져주시기 바랍니다.

Big-Database Big-DB Data-pipeline

임종규, 신중훈 2017.07.12

임종규: LINE에서 광고 데이터를 처리하고 있습니다. 신중훈: LINE에서 광고 데이터 플랫폼을 개발하고 있습니다.

Add this entry to Hatena bookmark

리스트로 돌아가기