Kafka와 MongoDB, Kubernetes로 유연하고 확장 가능한 LINE 쇼핑 플랫폼 구축하기

들어가며

안녕하세요. LINE Commerce Platform에서 LINE 쇼핑(일본어) 개발을 담당하고 있는 유재균입니다. LINE 쇼핑에선 여러 판매자로부터 전달 받은 상품 정보를 한곳에 모아서 일정한 기준에 따라 분류하고, 그룹화한 뒤, 정렬하여 사용자들이 상품 정보를 쉽게 얻을 수 있도록 제공하고 있습니다. 네이버 쇼핑이나 다나와 등이 LINE 쇼핑과 동일한 모델이라고 할 수 있습니다.

LINE 쇼핑의 판매자는 경우에 따라 수십에서 수천만 개 이상의 상품 정보를 보유하고 있는데요. 이런 판매자들의 상품을 한곳에 모아서 서비스하려면 어떻게 플랫폼을 구축해야 할까요? 이번 글에선 저희 팀이 이 어려운 과제를 해결하기 위해 어떤 플랫폼을 구축했는지 소개하려고 합니다.

 

요구 사항

LINE 쇼핑 플랫폼의 요구 사항을 비즈니스 및 사용자 경험 측면과 시스템의 유연성 측면으로 나누어 살펴보겠습니다. 

 

사용자 경험 개선

LINE 쇼핑에서 하루에 처리해야 하는 상품의 개수는 약 4억 5천만 개 정도인데요. 불행하게도 처리 요청이 24시간에 걸쳐 균등하게 분배되지 않아서 대부분 새벽에 처리해야 합니다. 또한 4억 5천만 건의 상품 중 많게는 하루에 약 2~3억 개의 상품에서 변경이 발생합니다. 이를 처리하는 작업은 데이터베이스에 대량의 DML(Data Manipulation Language)을 동반하고, 검색 엔진에 색인 작업도 해야 하는데요. 만약 이 작업 처리가 늦어지면 LINE 쇼핑 사용자는 아래와 같은 나쁜 사용자 경험을 하게 됩니다. 

  • 판매자가 새로운 상품을 판매하기 시작했으나 LINE 쇼핑에서는 찾을 수 없다.
  • 판매자가 상품의 가격을 변경했으나 LINE 쇼핑에서는 과거의 가격을 보여준다.
  • 판매자의 상품이 품절되었으나 LINE 쇼핑에는 판매 중으로 표시되고 사용자는 판매자의 사이트로 불필요하게 이동하게 된다.

사용자가 위와 같은 경험을 하지 않게 하려면 4억 5천만 개의 상품을 최대한 빠르게 처리해야 합니다.

 

시스템 유연성 개선

이번 플랫폼 개선 전에는 RDBMS(Relational Database Management System)인 Oracle 중심의 시스템(이하 레거시 시스템)을 구성하여 서비스하고 있었습니다. 상품 개수가 수천만 개 정도인 경우에는 Oracle을 사용해도 문제가 되지 않습니다. 하지만 수억 개를 처리하려면 고려해야 할 사항이 급격하게 늘어납니다. 아무리 고성능의 Oracle이라 해도 RDBMS의 특성상 스케일 아웃(scale-out)이 어렵고, 스케일 업(scale-up)에도 한계가 있으며, 이에 따른 비용 증가도 감당해야 합니다. 또한 상품 수가 1억 개가 넘는 판매자가 신규로 입점하거나 이벤트 진행 등의 영향으로 데이터의 변경이 급증하는 상황에도 대비할 수 있어야 합니다. 이와 같은 상황을 고려한 시스템 유연성 측면의 요구 사항은 아래와 같이 정리할 수 있습니다.

  • 비용을 들인 만큼 처리량이 늘어나야 한다.
  • 처리량에 한계가 없어야 한다.
  • 서버를 유연(flexible)하게 사용할 수 있어야 한다.

 

시스템 구성

사용자 경험 측면과 시스템의 유연성 측면으로 나누어 살펴본 요구 사항은 아래와 같이 요약할 수 있습니다.

  • 데이터를 배치가 아닌 이벤트 기반으로 처리할 수 있어야 한다.
  • 데이터베이스는 스키마에 종속적이지 않으면서 스케일 아웃이 가능해야 한다.
  • 서버를 쉽게 구성할 수 있어야 하며 자원을 효율적으로 사용할 수 있어야 한다.

이런 요구 사항을 만족시키기 위해 아래와 같이 플랫폼을 구성했습니다.

요구 사항사용 기술
이벤트 기반 데이터 처리Apache Kafka, Kafka Connect
저장 공간에 제약이 없고 필요에 따라 확장할 수 있는 데이터베이스MongoDB
빠르게 배포할 수 있고 필요에 따라 확장할 수 있는 서버Kubernetes

이제 각각을 어떻게 유기적으로 엮어서 사용하고 있는지 좀 더 자세히 설명하겠습니다.

 

Kafka와 MongoDB

레거시 시스템은 데이터를 배치로 처리했습니다. 일정량의 데이터를 모으고, 처리하고, 저장하는 과정의 연속이었죠. 이런 형태의 처리는 큰 약점이 있습니다. 데이터를 배치 단위로 처리하다 보니 처리 단위의 앞부분에 위치한 데이터가 나머지 뒷부분의 데이터가 처리될 때까지 대기해야 한다는 점입니다. 물론 처리 성능을 극대화하기 위해 배치로 처리해야 하는 부분들도 있지만, 저희는 상품의 변화를 조금이라도 빠르게 사용자에게 전달하고 싶었습니다. 그래서 배치 처리 방식을 버리고 이벤트 기반 아키텍처(Event Driven Architecture, 이하 EDA)로 전환하기로 결정했으며, 이를 위한 메시지 브로커(broker)로 Kafka를 선정했습니다. 

Kafka는 메시지를 다시 읽어오는 부분에 강점이 있습니다. 동일한 토픽(topic)을 여러 컨슈머(consumer) 그룹이 서로 다른 관점에서 처리하기 용이한 구조입니다. 아래에서 좀 더 자세히 설명하겠지만 상품 데이터에는 이미지와 카테고리, 브랜드, 색상 등 여러 가지 정보가 포함되어 있고, 각 정보는 MSA(MicroService Architecture)로 구성된 여러 모듈이 각자의 관심사에 따라 독립적으로 처리합니다. 이런 구조 덕분에 상품 처리가 매우 빨라졌는데요. Kafka를 백본(backbone)으로 구성한 덕분에 EDA와 MSA를 동시에 얻게 된 셈입니다.

EDA는 누군가가 발생시킨 이벤트를 시발점으로 움직이며, 발생한 이벤트는 어떤 형태로든 데이터의 변경을 수반합니다. 이벤트를 발생시키는 주체는 다양합니다. 대표적으로 판매자가 상품 데이터를 변경해서 전송하는 경우가 있으며, 운영자가 관리 도구에서 변경하거나 시스템이 처리 과정에서 변경하는 경우도 있습니다. 그런데 이런 데이터 변경 지점을 하나하나 찾아서 Kafka에 프로듀싱하는 건 상당히 부담스러운 일입니다. 그래서 저희는 데이터 변경을 데이터베이스에서 감지하기로 했습니다. 데이터가 변경되면 어딘가에 저장해야 하는데 그곳이 바로 데이터베이스이기 때문입니다. 

널리 사용되는 대부분의 DBMS는 데이터 수준의 로그를 남깁니다. 이런 로그들은 백업 및 포인트 복원 용도로 사용하기도 하고 복제 용도로 사용하기도 하는데요. 저희는 데이터의 변경을 감지하기 위해 MongoDB와 MySQL에 쌓이는 로그를  Kafka Connect를 이용해 Kafka로 전달하기로 했습니다. Kafka Connect를 이용하면 이기종 시스템 간에 데이터를 쉽게 이동시킬 수 있습니다. 이때 데이터는 Kafka를 통해 스트림으로 전송되고 스트림으로 저장됩니다. 

KAFKA connect
참고로 현재 Kafka 커넥터(connector)를 제작할 때 JDBC와 같은 통일된 규격이 존재하지 않습니다. 따라서 커넥터마다 사양이 제각각이며 각각 장단점이 있기 때문에, 각자의 요구 사항에 맞는 커넥터를 찾기 위해서는 각각의 사양을 정확하게 이해하고 다양한 테스트를 수행해 보는 것이 좋습니다. 저희는 Debezium에서 제공하는 커넥터를 사용했습니다.

Kafka와 Kafka Connect를 이용하면 변경된 데이터(Change Data Capture: 이하 CDC)를 쉽게 얻을 수 있고, 서로 다른 기종 간의 데이터를 통합하는 것도 어렵지 않게 구현할 수 있습니다. 아래 JSON 데이터는 MongoDB에서 업데이트가 발생했을 때의 CDC 토픽 데이터입니다.

{
   "after":null,
   "patch":"{\"$v\": 1,\"$set\": {\"currentDateTest1\": {\"$date\": 1580807056954},\"currentDateTest2\": {\"$timestamp\": {\"t\": 1580807056,\"i\": 1}},\"currentDateTest3\": {\"$date\": 1580807056954},\"incTest1\": 11.0,\"incTest2\": 9.0,\"incTest3\": 11.6,\"incTest4\": 9.4,\"maxTest2\": {\"$numberLong\": \"100\"},\"minTest1\": {\"$numberLong\": \"1\"},\"mulTest1\": 20.0,\"mulTest2\": {\"$numberDecimal\": \"12.089\"},\"renameTest2\": \"abc\",\"setTest\": \"setTest\"},\"$unset\": {\"renameTest\": true,\"unsetTest\": true}}",
   "source":{
      "version":"0.10.0.Final",
      "connector":"mongodb",
      "name":"line-shopping-mongo",
      "ts_ms":1580807056000,
      "snapshot":"false",
      "db":"lineShopping",
      "rs":"mongod_replset_1",
      "collection":"products",
      "ord":2,
      "h":4375978282806671334
   },
   "op":"u",
   "ts_ms":1580807056958
}

"op" : "u" 항목을 통해 업데이트가 발생했다는 것을 알 수 있고, "patch" 항목을 살펴보면 어떤 칼럼이 어떻게 바뀌었는지도 확인할 수 있습니다. 이런 방식으로 MongoDB의 모든 변경 사항이 Kafka의 CDC 토픽에 쌓입니다. 물론 MySQL의 변경 데이터도 Kafka의 CDC 토픽에 쌓을 수 있으며, 아래에서 다시 말씀드리겠지만 KSQL을 통해 성격이 전혀 다른 두 데이터베이스의 데이터를 조인(join)하는 것도 가능합니다. 

이제 모든 데이터가 Kafka에 모입니다. Kafka는 데이터 전송의 백본 역할을 수행하는데요. 관심사나 용도에 따라 다양한 컨슈머만 추가하면 됩니다.

판매자 데이터를 받아와 프런트 엔드에 노출할 때까지의 데이터 파이프라인 구성
데이터 파이프라인의 상품 수신 부분

각각의 역할을 간단하게 정리해 보았습니다.

MongoDB주 저장소입니다. 일반적으로 NoSQL에 적합한 데이터를 저장합니다.
MySQL주 저장소입니다. 일반적으로 RDBMS에 적합한 데이터를 저장합니다.
KafkaMongoDB와 MySQL에서 CDC 데이터를 프로듀싱하여 CDC 토픽에 저장합니다.
Elastic Search관리 도구에서 사용하는 검색과 일부 프런트 엔드에서 사용합니다. CDC 토픽을 컨슈밍(cousuming)하여 동기화합니다.
Hadoop (M/R)EDA로 처리하기 어려운 대용량 데이터 분석 및 통계 용도로 사용합니다.
HBase상품의 변경 이력을 추적하는 용도로 사용합니다. 언제, 누가, 무엇을 변경했는지 상세하게 기록을 남깁니다.

사실 CDC를 위해 데이터 변경 지점에서 DBMS와 Kafka에 이중으로 쓰는(dual write) 형태로 처리할 수도 있습니다. 다만 DBMS에 저장된 데이터의 순서와 Kafka에 저장된 순서가 일치해야 하는데요. 이를 위해서는 전혀 다른 성격의 데이터를 하나의 트랜잭션으로 묶어야 합니다. 반면에 Kafka Connect를 사용한 CDC 데이터는 MongoDB의 Oplog와 MySQL의 BINLog를 테일링(tailing)하고 있기 때문에 DBMS의 순서와 정확하게 일치합니다(물론 MongoDB의 샤딩 키와 Kafka의 파티셔닝 키는 같은 것을 사용해야 합니다). 따라서 변경된 데이터로 어떤 처리를 하는 프로세스를 추가하거나 새로운 저장 플랫폼에 연동하는 작업이 필요할 땐 간단하게 컨슈머 하나만 추가하면 되며, Kafka Connect의 Sink Connector를 이용하면 컨슈머를 추가하지 않고도 가능합니다. 각 스트림 프로세서는 역할이 구분되고 최소화되어 있기 때문에 MSA 관점에서도 잘 맞습니다. 아래 그림은 Kafka와 MongoDB를 Kafka Connect로 유기적으로 연결해 완성한 모습입니다.

 

MongoDB와 MySQL 데이터 조인

저희는 확장성이 필요한 벌크 데이터는 MongoDB에, 트랜잭션 처리와 정교한 모델링이 필요한 데이터는 MySQL에 각각 따로 저장합니다. 하지만 서비스를 위해서 필연적으로 서로 다른 두 기종 간의 데이터 조인이 필요한데요. 현재 아래와 같이 두 가지 방법으로 조인을 수행하고 있습니다.

 

애플리케이션에서 조인

애플리케이션에서 MongoDB와 MySQL의 조인을 수행합니다. 각각의 시스템에서 데이터를 조회해 애플리케이션에서 합쳐서 사용하는 방법입니다.

 

Kafka Streams 혹은 KSQL을 이용한 조인

Kafka Streams 혹은 KSQL을 이용해 아래와 같은 순서로 조인을 수행합니다.

  1. MongoDB와 MySQL의 변경 데이터를 Kafka Connect를 이용하여 각각의 CDC 토픽으로 전송합니다.
  2. Kafka Streams 또는 KSQL을 이용하여 두 토픽을 실시간으로 조인합니다.
Kafka Streams를 이용한 이기종 간의 데이터 조인

 

Kubernetes 활용

저희는 MongoDB와 Kafka를 이용해 데이터의 저장 및 이동에 대한 확장성(scalability)를 확보했습니다. 이제 남은 부분은 데이터를 처리해서 사용자에게 보여주는 부분입니다. LINE 쇼핑 서비스에서 사용하는 서버는 수십 대가 넘기 때문에 이 많은 서버들을 유연하게 사용하고 관리하기 위해선 상당히 많은 노력이 필요한데요. Kubernetes(이하 K8S)는 이러한 문제점을 해결하기에 아주 좋은 도구입니다. 저희가 K8S를 통해 얻고자 하는 부분은 아래와 같았습니다.

 

컴퓨팅 자원 사용 최적화

LINE 쇼핑 상품은 주로 새벽 시간대에 많은 변경이 발생합니다. 상품 정보를 보내주는 판매자 쪽 시스템에 여유가 생기는 시간대가 보통 새벽이라서, 대부분의 판매자가 이때 전체 상품을 전송하기 때문입니다. 반면 이 시간대에 관리 도구나 프런트 엔드에서는 접속이 거의 발생하지 않습니다. 따라서 각 영역에 시스템 자원을 고정적으로 할당하면 자원 낭비가 발생할 수밖에 없는데요. K8S를 이용하면 이런 낭비를 막을 수 있습니다. K8S는 자원을 노드라는 풀(pool) 형태로 관리하면서 애플리케이션에 따라 적절한 노드를 자동으로 선택하며, 수집된 지표를 이용해 애플리케이션의 자원 요구량을 자동으로 측정 및 판단해 오토 스케일링이 되도록 구성할 수도 있습니다. K8S에서는 기본적으로 다양한 지표를 수집하며 Kafka의 서드 파티 도구인 Burrow 같은 것을 이용하면 필요한 수치나 지표를 추가로 수집할 수 있습니다.

 

애플리케이션 개발 및 배포의 편리성

애플리케이션을 작은 역할 단위로 개발하고 파이프 라이닝하여 배포할 수 있습니다. OS의 종류나 버전에 구애받지 않기 때문에 Docker 구동 환경이라면 어디서든 작동이 보장됩니다. 또한 Rolling이나 Blue-green 등의 배포 전략을 쉽게 적용할 수 있으며, Istio를 이용하면 Canary 배포도 구현할 수 있습니다.

 

모니터링 및 장애 자동 복구

K8S는 배포된 애플리케이션 및 노드를 지속적으로 모니터링하며 파드(pod)나 노드에 문제가 발생한 경우 다른 적절한 노드로 이동시켜 장애 상황을 벗어날 수 있습니다.

 

인프라 구축의 용이성

막강한 helm chart를 통해 복잡한 인프라나 프레임워크를 단 몇 줄의 명령어로 설정할 수 있습니다.

위와 같은 장점을 살려서 현재 아래와 같은 영역에서 K8S를 사용하고 있습니다.

  • 판매자로부터 전달받은 상품을 Kafka에 프로듀싱하는 용도
  • CDC 데이터를 컨슈밍해서 카테고리 매칭이나 브랜드 혹은 제조사 매칭, 이미지 생성, 데이터 전송 등을 처리
  • RESTful API 서비스
  • 프런트 엔드 서비스

이후 K8S 관련 경험이 쌓이면 스테이트풀(stateful) 서비스인 Kafka도 적용할 계획입니다.

EDA 및 K8S를 적용한 구성도

 

HBase를 이용한 데이터 변경 추적

쇼핑은 금전 처리가 포함된 서비스이기 때문에 데이터가 어느 시점에 어떤 값이었는지가 매우 중요합니다. 이렇게 데이터의 히스토리를 추적할 필요가 있을 때 HBase를 활용할 수 있습니다. HBase는 칼럼형(column-oriented) 데이터베이스입니다. 변경된 데이터만 칼럼 단위로 저장해 저장 공간 사용을 최소화할 수 있는 구조입니다.

칼럼 단위 버저닝(versioning)을 지원하는 HBASE의 특징을 이용하면 아래와 같이 특정 상품의 특정 칼럼이 변경된 히스토리를 조회할 수 있습니다.

또한 HBase는 Hadoop 에코 시스템들과의 궁합이 좋습니다. Hive와 연동하여 대량 데이터를 SQL 기반으로 처리하기에 유용하며, 스냅샷 기능을 이용하면 특정 시점의 데이터를 추출하는 것도 간단하게 구현할 수 있습니다.

 

트러블 슈팅 – MongoDB 

보통 DBMS의 사용 패턴을 살펴보면 대부분 쓰기보다 읽기 비율이 현저하게 높습니다. 하지만 LINE 쇼핑은 약 4.5억 개의 상품을 매일 처리하며 판매자에 따라서 하루에도 여러 번 상품을 처리하기 때문에 그렇지 않습니다. 예를 들어 상품 수가 1억 개인 판매자가 어느 날 할인 이벤트를 진행하면 최소 1억 개의 DML이 발생하는 형태입니다. 변경이 발생한 데이터를 빠르게 처리해서 검색 엔진에 색인까지 마쳐야만 비로소 사용자가 해당 상품을 검색할 수 있기 때문에 처리 시간을 줄이는 것이 LINE 쇼핑 플랫폼의 핵심 포인트이자 목표인데요. 쓰기 비율이 높은 LINE 쇼핑의 DBMS 사용 패턴 때문에 응답 속도가 급격하게 느려지는 문제가 발생했습니다. 

 

발생한 문제

처음엔 샤드 7개 x 샤드 당 레플리카 3개로 MongoDB를 구성했습니다. DML이 많이 발생하지 않는 일반적인 상황에서는 아래와 같습니다.

그런데 판매자가 대량으로 상품을 변경하거나 새로운 항목을 추가 혹은 변경한 경우에는 아래와 같이 MongoDB에 많은 부담이 발생합니다.

MongoDB는 빠른 처리를 위해 메모리를 적극적으로 활용합니다. 하지만 위와 같이 많은 데이터의 변경이 발생하면 캐시 히트율이 떨어지는 것은 물론, 데이터 변경을 메모리에서 처리하지 않고 바로 디스크에 쓰는 모드로 전환되면서 I/O 처리를 기다리는 시간이 증가하고 응답 속도가 급격하게 느려지는 문제가 발생했습니다.

 

하드웨어 개선

이러한 문제는 MongoDB 때문에 발생한 것은 아닙니다. Oracle을 사용할 때도 같은 문제가 있었으며, 디스크 I/O가 많은 데이터일수록 이러한 현상이 자주 발생합니다. 하지만 Oracle과 MongoDB는 해결 방법이 다릅니다. Oracle은 더 많은 메모리를 증설하고 더 비싼 SAN(Storage Area Network) 스토리지와 더 빠른 CPU로 변경하는 것과 같은 스케일 업 형태의 대응만 가능한 반면에, MongoDB는 샤드를 추가하기만 하면 됩니다. 실제로 오픈 초기에 예상했던 것보다 MongoDB의 사용률이 높았기 때문에 7개였던 샤드를 10개로 증설하면서 문제는 간단하게 해결되었습니다. 증설 과정도 Oracle과 다르게 온라인 상태에서 가능했기 때문에 다운 타임이 없었으며, 아래와 같은 간단한 과정으로 증설할 수 있었습니다.

  1. 추가할 샤드 장비 설정
  2. 밸런싱을 활성화하여 청크(chunk) 마이그레이션이 되도록 설정
sh.enableBalancing(“lineShopping.products”)

 

로직 개선

위와 같이 하드웨어의 힘을 빌리는 것과 동시에 로직을 개선해 디스크 I/O를 줄이는 노력을 병행했습니다. 상품 수신 프로세스의 시작은 4.5억 건의 상품 정보를 판매자로부터 전달받아 이전에 받은 데이터와 비교해 변경된 부분이 있는지 확인하는 것입니다. 상품 데이터에는 상품명과 브랜드, 가격, 상세 설명 등 약 60여 개의 필드가 존재하며, 개당 크기는 수백~수천 바이트 정도됩니다. 간혹 데이터의 변경이 많이 발생하는 경우도 있지만, 대부분 전체 상품의 10분의 1 정도만 변경됩니다. 따라서 이 데이터를 매번 디스크에서 읽는 것은 그다지 효율적이지 않습니다. 대신 각 상품에 대응하는 해시를 저장해 두고 이 값을 비교하여 변경 여부를 판단하는 프로세스를 고려했습니다. 이 방법으로 수천 바이트의 데이터를 읽어오는 대신 적은 양의 데이터만 읽어서 비교한 뒤, 변경이 발생한 경우에만 오버라이트한 결과 디스크 I/O의 상당 부분을 줄일 수 있었습니다.

 

마치며

이번 프로젝트에 사용한 기술 스택을 아래와 같이 그림으로 정리해 보았습니다.

Kafka와 MongoDB, K8S를 주축으로 파이프 라이닝을 구성한 결과 독립적이고 유연하며 쉽게 확장할 수 있는 상품 수신 플랫폼이 구축되었습니다. 각 모듈(스트리밍 프로세스)은 최소한의 기능만 갖고 있어 MSA에 적합하며, EDA 기반의 데이터 핸들링으로 데이터의 변화에 빠르게 대응할 수 있고, 시스템은 언제든지 자유롭게 자동으로 확장되며, 스스로 장애를 복구합니다. 새로운 플랫폼과 함께 더욱 다양한 서비스를 제공하여 사용자에게 좀 더 나은 사용자 경험을 제공할 수 있을 것으로 기대합니다.