내부 데이터 파이프라인에 Kafka Streams 적용하기

Kafka Streams 소개

안녕하세요, LINE에서 서버 개발 엔지니어로 일하는 Yuto Kawamura라고 합니다. 저는 HBase, Kafka와 같은 LINE의 핵심 스토리지 설비를 개발, 운영하는 일을 주로 맡고 있습니다.

작년 하반기부터는 IMF(Internal Message Flow 또는 Fund)라는 명칭의 신규 프로젝트도 맡고 있습니다. IMF 프로젝트의 목적은 크게 두 가지입니다.

  • 사내 시스템 간의 이벤트 전달을 일원화된 방식으로 처리하는 데이터 파이프라인 개발
  • LINE 서버 시스템 구성요소 중 백그라운드 태스크 처리를 담당하는 talk-dispatcher 개선

이 두 목적은 서로 연관이 없어 보이지만 저희는 두 부분에 동일하게 Apache Kafka와 스트림 프로세싱 기술을 적용해 보려고 합니다. Apache Kafka는 LinkedIn에서 주로 개발하고 사용해온 대용량 분산 메시징 시스템입니다. 여러 특화된 기능을 제공하고 있는데 가장 중요한 특징은 다음과 같습니다.

  • 디스크에 기반한 영속적인 저장 방식을 사용합니다. 하지만 페이지 캐시를 활용하여 높은 처리량을 제공하는 인메모리(in-memory) 방식에 가깝습니다.
  • 여러 consumer가 한 topic(일종의 queue 개념)으로부터 여러 번에 걸쳐 메시지를 가져올 수 있습니다. 이런 방식이 가능한 이유는 클라이언트가 해당 queue에서 어느 부분까지 데이터를 받아갔는지 위치를 알려주는 ‘offset’을 관리하기 때문입니다.

Kafka 엔지니어링의 기본 원리나 개념 등 재미있는 주제가 많이 있지만 이번 블로그에서는 LINE에서 스트림 프로세싱을 어떻게 구현했는지에 대해 중점적으로 소개하고자 합니다.

Stream processing framework

스트림 프로세싱에는 Apache Storm, Apache Spark, Apache Flink, Apache Samza 등 널리 사용되는 프레임워크가 여럿 있습니다. 저희는 처음에는 Apache Samza를 고려해보았습니다.
Samza는 Kafka와 마찬가지로 LinkedIn에서 만들었습니다. Kafka와 연동하도록 설계되었기 때문에 Kafka와의 친화력이 뛰어나고 Kafka와의 통합을 기본으로 지원합니다. 기본적인 기능 자체에는 문제가 없었지만 얼마 지나지 않아 핵심 인프라스트럭처에 적용하기에는 염려되는 점이 생겼습니다. 주요 비즈니스 로직을 처리하는 곳이며 서비스의 안정성에 영향을 미칠 수 있는 민감한 부분이었습니다.

  • Samza는 YARN에 크게 의존적입니다. YARN은 아주 잘 만들어진 분산 리소스 할당 프레임워크이면서 널리 쓰이고도 있지만 다음과 같은 점들이 우려되었습니다.
    • 최초에는 배치 프로세싱을 위해 설계되었고 스트림 프로세싱은 가능하긴 하지만 일부 Hadoop을 계승한 부분이 스트림 프로세싱에 부적합하다고 느꼈습니다.
    • LINE의 사내 배포 시스템인 PMC(Project Management Console. LINE의 주요 서비스를 관리하는 도구로서 CMDB(configuration management database)의 빌드/배포 기능을 합친 서비스)와의 친화력이 전무합니다.
    • 아키텍처는 최대한 단순하게 유지하고 싶었습니다. 저희는 중단 없이 계속 실행 중인 애플리케이션에 리소스를 분리하거나 할당할 필요가 없었는데, 이유는 다음과 같습니다.
      • 서버는 기본적으로 서로 다른 서비스마다 별도로 할당됩니다.
      • 실제 대부분의 메모리를 소모하는 부분은 heap이지만 JVM에는 최대 heap을 제한하는 옵션이 있습니다.
      • 저희 애플리케이션의 특성상 CPU는 문제가 되지 않습니다.
      • 네트워크 트래픽은 문제가 될 소지가 있는데, YARN은 네트워크 I/O를 조절하는 기능이 없습니다.
  • 사내 엔지니어링 설비는 ‘호스트’ 개념을 철저하게 따르고 있습니다. 예를 들어, 사내 서비스를 모니터링하는 데에 사용하는 자체 개발 툴 IMON은 호스트별로 메트릭을 보거나 알람을 보낼 수 있으며 Kibana는 호스트별로 로그를 저장합니다. 따라서 어떤 호스트가 작업을 수행할지 결정하는 일을 YARN에게 위임하기는 무리가 있습니다.
  • Apache Samza의 개발 활동이 상대적으로 적어보입니다.

물론 YARN은 하나의 리소스 풀로 많은 작업이나 애플리케이션을 배치하고 실행하는데 매우 유용합니다. 저희도 여전히 통계 작업이나 문제 조사 중에 필요한 애드혹 작업 등을 위해 사용하고 있습니다.

Kafka Streams

2016년 3월 10일 Confluent(LinkedIn에서 Apache Kafka를 최초 개발한 사람들이 세운 회사)에서 ‘Introducing Kafka Streams: Stream Processing Made Simple’이라는 제목으로 블로그를 게시했습니다. 이 블로그를 읽으면서 Kafka Streams가 바로 우리에게 필요한 것이라는 생각이 들었습니다(그때까지만 해도 독자적으로 개발할 생각도 했었습니다). 다른 일반적인 Stream processing framework가 ‘실행 프레임워크’인 반면 Kafka Streams는 ‘라이브러리’입니다. Apache Samza에서 가져온 개념도 일부 있지만 중요한 차이가 있습니다.

  • 그냥 라이브러리입니다. 실행 프레임워크가 아니기 때문에 사용자가 수동으로 구동해야 합니다. 특정 프레임워크에 탑재하여 실행할지 public static void main()을 쓸지 여부는 전적으로 개발자가 결정합니다.
  • Kafka에서 제공하는 본래의 기능들을 모두 활용함으로써 간단하게 핵심 기능을 개발할 수 있고 매우 가볍습니다.
  • 직관적인 DSL을 사용하여 프로세싱 토폴로지를 정의할 수 있습니다.
  • Kafka 공식 커뮤니티가 주도하여 개발하고 있기 때문에 매우 활발한 개발 활동을 기대할 수 있습니다.
  • Rolling restart를 지원하기 때문에 실제 서비스 트래픽을 처리하는 와중에도 단일 인스턴스 상에서 업데이트된 애플리케이션의 동작을 확인하기 용이합니다.

Kafka Streams는 Kafka 버전 0.10.0.0과 함께 공개되었습니다. 저희가 처음 Kafka Streams를 시도할 당시에는 Kafka 0.10.0.0이 아직 릴리스 전이었기 때문에 소스 리포지터리로부터 아티팩트를 직접 빌드해야 했었습니다. 또한 Kafka 브로커들 역시 0.10.0.0 버젼 이상이 요구되지만 지금 저희가 사용하는 클러스터는 0.9.0.1입니다. 그래서 호환되지 않는 프로토콜을 다운그레이드하기 위해 클라이언트 라이브러리를 수동으로 패치하는 등의 일부 까다로운 작업이 필요했습니다(물론 새 버전이 나오는 대로 클러스터를 업그레이드할 예정입니다). 그렇다 하더라도 직접 새로운 구현체를 만드는 것보다는 쉬울 거라고 생각합니다.

다음 부분에서는 Kafka Streams에서 제공하는 몇 가지 흥미로운 기능에 대해 설명하겠습니다.

Masterless

Kafka Streams는 일반적인 분산시스템에서 장애 감지, 처리 노드간 조율, 파티션 할당 등을 수행하기 위해 존재하는 마스터라는 개념이 없습니다. 대신 Kafka의 자체 조율 메커니즘에 전적으로 의존합니다. 이는 곧 작업 노드 간의 통신이 필요 없다는 뜻입니다. 주어진 applicationId로 새 KafkaStreams 인스턴스를 실행하면 해당 applicationId에 대한 consumer 중 하나로 Kafka broker를 subscribe합니다. 파티션 할당들이 재조정되거나 failover가 발생하면 Kafka 브로커가 스스로 이를 감지하고 처리하기 때문에 작업 노드들이 이를 위해 통신할 필요가 없어집니다.

High-level-DSL API와 Low-level API

Kafka Streams는 스트림 프로세싱 프로그래밍을 위해 high-level-DSL과 low-level API의 두 가지 API를 지원합니다.

High-level-DSL

일반적인 스트림 프로세싱은 스트림에 transform, filter, join, aggregate 연산 처리를 적용하고 그 결과를 저장하는 과정입니다. 이처럼 기본적인 연산 처리에는 high-level-DSL 인터페이스가 적합합니다. high-level-DSL을 사용하면 Scala Collections API와 상당히 유사한 형태로 collection, transform 등의 연산을 프로그래밍할 수 있습니다. 다음은 IMF 프로젝트에서 개발한 loopback topic replicator의 예입니다(loop topic replicator의 용도는 특정 기준으로 원본 토픽의 메시지들을 필터링하여 새 토픽에 저장하는 것입니다).

KStreamBuilder builder = new KStreamBuilder();
KStream<Long, OperationLog> stream =
     builder.stream(sourceTopic.keySerde(), sourceTopic.valSerde(), sourceTopic.name());
 
Map<String, Set<OpType>> categories = loadCategories();
for (Map.Entry<String, Set<OpType>> entry : categories.entrySet()) {
    String topic = entry.getKey();
    Set<OpType> opTypes = entry.getValue();
    TalkOperationLogV1 destTopic =
            InternalMessageTopics.getTopic(topic, TalkOperationLogV1.class);
 
    stream.filter((key, value) -> {
              TalkOperation op = value.getOperation();
              return op != null && opTypes.contains(op.getOpType());
          })
          .to(destTopic.keySerde(), destTopic.valSerde(), destTopic.name());
}
Properties props = loadStreamsProps();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

이 코드의 목적은 원본 topic에서 파생된 topic을 opType 카테고리별로 구성하는 것입니다. 다음의 몇 줄로 표현할 수 있는데 직관적으로 해석 가능합니다. 1) topic과 key-value serializers를 지정하여 KStream을 생성, 2) 각 요소에 필터 적용, 3) 결과를 해당 카테고리에 대응하는 topic에 저장하는 형태입니다.

KStream<Long, OperationLog> stream =
     builder.stream(sourceTopic.keySerde(), sourceTopic.valSerde(), sourceTopic.name());
 
...
 
stream.filter((key, value) -> {
          TalkOperation op = value.getOperation();
          return op != null && opTypes.contains(op.getOpType());
      })
      .to(destTopic.keySerde(), destTopic.valSerde(), destTopic.name()); 
      // Store the resulting messages to the topic pointed by destTopic

Low-level API

통상적이지 않은 방식을 필요로 하는 몇몇 희귀한 경우(메시지를 내용에 기반하여 특정 다운스트림으로 보낸다거나)에는 low-level API를 사용해야 합니다. Processor API는 대체로 직관적이기 때문에 특별히 이해하기 어려운 부분은 없을 것입니다. low-level API가 실제로 적용된 사례를 보고 싶다면 링크된 공식 리포지터리의 ‘examples’ 디렉터리를 확인해보시기 바랍니다. 기본적으로 시작단계에서는 high-level-DSL API 사용을 권장하지만, 필요하다면 low-level API도 얼마든지 사용할 수 있습니다.

Fault-tolerance local state DB

스트림 프로세싱을 구현하다 보면 다양한 목적을 위해 state를 보관해 둘 필요가 생깁니다. 로컬 state는 흔히 aggregation, join, windowing을 구현할 때 사용하지만, 다른 경우에 사용하기도 합니다. Kafka Streams에서는 각 프로세서가 고유의 state store를 가질 수 있습니다.
Kafka Streams의 changelog 메커니즘 덕분에 장애가 발생하여 프로세서가 다른 호스트로 failover하게 되면, state DB도 함께 새 프로세서로 이전됩니다. Kafka Streams가 state DB를 위한 물리 store(pluggable하기 때문에 in-memory store나 RocksDB 등으로 얼마든지 갈아끼우는 것이 가능)를 업데이트하는 동안, ‘changelog’ topic을 위한 특수한 메시지를 생성합니다. 이 changelog topic은 로컬 state의 WAL(Write-Ahead-Log)로 간주할 수 있습니다. Kafka topic은 몇 번이든지 반복해서 읽을 수 있기 때문에, 프로세서에 failover가 발생할 때마다 새로운 프로세서가 changelog topic으로부터 읽은 mutation log를 replay하여 로컬 state DB 복구가 가능해집니다. 다시 말해, 프로세서 state를 보관하기 위해 별도의 외부 스토리지를 준비할 필요가 없습니다. Kafka와 Kafka Streams만으로도 충분히 같은 효과를 얻을 수 있기 때문입니다.

Kafka Streams를 이용하여 구현한 것들

Loopback replicator

LINE에서는 우선 Kafka Streams를 써서 Kafka topic replicator를 구현하였습니다. 단순한 클러스터 간 topic 복제가 아닌, 맵/필터 등의 연산을 메시지에 적용하여 topic을 복제하기 위함입니다. 현재까지 가장 주된 사용 목적은 원본 topic에서 메시지를 분류, 용량이 적은 파생 topic을 제공하여 consumer가 더 적은 수의 메시지를 읽도록 함으로써 네트워크 트래픽과 리소스 사용량을 줄이는 것입니다.
예를 들어, 현재 저희는 TalkOperation 로그를 생성하고 있습니다. TalkOperation은 LINE 클라이언트와 talk-server 간의 통신에 필요한 핵심 데이터 구조입니다. 사용량이 피크인 시간대에는 topic으로 수신되는 메시지 수가 초당 100만 개에 육박합니다. 어떤 consumer들은 모든 종류의 TalkOperation에 관여하지만 모든 consumer들이 그렇지는 않습니다.
가령 어떤 consumer가 LINE의 친구관계 기능에 관련된 TalkOperation(ADD_CONTACT, BLOCK_CONTACT 등)만 받고 싶다면 전체 스트림을 읽어들일 필요가 없습니다. 그런 경우에는 이 loopback replicator를 사용하여 친구관계 기능에 관련된 TalkOperation만을 보유한 파생 topic을 제공합니다. 현재 loopback replicator는 단일 Java 애플리케이션으로 배포되고 그 외에 다른 특별한 것은 없지만 아직까지 문제없이 잘 동작하고 있습니다. 테스트를 위해서 인위적으로 몇 번의 failover를 겪도록 한 적은 있지만요.

Decaton

Decaton은 제가 초반에 언급한 talk-dispatcher를 대체하기 위한 백그라운드 태스크 처리 시스템입니다. 기존의 talk-dispatcher에는 아래와 같은 문제점이 있었습니다.

  • 처리방식의 확장성 문제. 특정 talk-server에서 생성된 모든 태스크는 같은 호스트 내에서 구동 중인 로컬 Redis queue에 들어갑니다. consumer인 talk-dispatcher 역시 같은 서버에서 구동되며 로컬 Redis 인스턴스에 있는 태스크만 꺼내서 처리합니다. 서버에서 burst라도 발생하게 되면, 태스크를 소비하는 인스턴스가 하나밖에 없기 때문에 queue의 크기가 걷잡을 수 없이 커집니다.
  • In-memory queue. Queue 서버로 Redis를 사용하고 있기 때문에 서버가 어떤 이유로 종료되면 queue의 내용 역시 모두 잃게 됩니다. 또한, 물리적인 메모리의 한계 때문에 queue에 등록할 수 있는 태스크 수도 크게 제한을 받습니다. 이미 이러한 한계로 인해 과거 몇 차례 대량의 태스크들을 유실한 경험이 있었습니다.
  • Out-of-order 처리. talk-server로 들어오는 요청들은 userId에 기반한 routing이 아니기 때문에, UserA가 보낸 요청은 어느 talk-server든지 수신할 수 있습니다. 그러한 요청에 의해 생성된 태스크는 로컬 queue에 들어가겠지만, 다른 서버로 간 요청으로 인해 발생한 태스크들은 다른 호스트에 있는 queue에 등록됩니다. 이런 태스크들은 각기 다른 talk-dispatcher 인스턴스에 의해 수행되기 때문에 처리 순서가 등록 순서와 상관없이 뒤섞일 수 있습니다.

여기서 Kafka를 이용함으로써, 1. 확장성 있는 분할된 queue, 2. 충분히 빠른 속도의 디스크 기반의 영속적인 queue, 3. 메시지 key로 shuffling하여 순서를 지키며 태스크 처리(경우에 따라 다르지만, 여기서는 userId를 key로 사용하는 것을 예로 듭니다) 등이 가능해집니다.

위의 도면은 Decaton의 작동 원리를 간단히 표현해 본 것입니다. 이는 앞서 말씀드린 대로 talk-dispatcher의 세 가지의 큰 문제점을 해결하는 한편, 서로 다른 프로세싱을 최대한 격리시키는 목적도 있습니다. Kafka 특성 상, Kafka topic은 휘발성이 아니기 때문에 서로 다른 프로세서들이 같은 태스크에 대해 각기 다른 처리를 독립적으로 수행할 수 있습니다. 프로세싱을 격리시키게 되면 동일 consumer 컨텍스트 내에서 연관되지 않은 태스크 처리 중 발생한 스토리지 요청이 오래 걸리거나 실패처럼 보이는 상황에서도 다른 프로세서들의 태스크 처리가 멈추지 않도록 할 수 있습니다.

위의 예로 보자면, TaskProcessorA와 TaskProcessorB는 HBase 서버 장애로 인해 처리가 무기한 멈춰버린 경우에도 StorageMutationProcessor로부터 영향을 받지 않습니다. 이로 인해 태스크가 queue에 쌓이게 되겠지만 이 또한 Kafka topic이 영속적이고 실질적으로 용량제한(디스크 기반이라서)이 없기 때문에 문제가 되지 않습니다.

Decaton은 임의의 topic로 메시지를 분배하는 기능 제공을 위해 low-level API를 이용한 사례 중 하나입니다. 이를 실제로 가능하도록 하기 위한 API 개선이 KAFKA-3497에서 이루어지기도 했습니다.

Kafka에 대한 기여

Kafka Streams를 이용하여 소프트웨어를 개발하면서, 몇몇 버그를 발견하기도 했고 개선사항 몇 가지에 대해 생각하게 된 계기도 되었습니다. 그래서 저는 몇 가지 이슈와 패치에 기여하였고, 이 중 일부는 이미 머지되었습니다. 버그 픽스뿐만 아니라 기능 개선에도 아래와 같이 의견이 반영되었습니다.

Kafka Streams는 아직 개발 초기 단계에 있기 때문에, 유저 의견에 유동적으로 반응하는 편입니다. Confluent 엔지니어들의 지원을 받는 커뮤니티이기 때문에 반응도 굉장히 빠르고, 의견을 개진하는 입장에서 부담을 덜 수 있어서 좋습니다. Kafka 커뮤니티는 완전히 개방되어 있기 때문에 비밀리에 진행되는 비공개 논의 같은 것도 없습니다(적어도 제가 본 바로는 말이죠.

이런 신규 개발 소프트웨어에 기여하는 것은 큰 의미가 있습니다. 그 소프트웨어의 제작 방향성에 직접적으로 관여할 수 있기 때문이죠. 커뮤니티와 힘을 합쳐 제가 필요로 하는 것을 개발해 낼 수 있다는 점은 재밌는 것 같습니다.

결론

Kafka Streams는 앞서 언급한 대로 아직 개발 초기 단계입니다만, 앞으로의 전망이 밝다고 생각합니다. 제가 아는 한, 서비스 성능이나 안정성에 직접적으로 영향을 미치는 주요 애플리케이션과의 연계를 염두에 두고 기초 단계부터 새로 설계된 최초의 Stream processing framework이기 때문입니다. Kafka 자체 또한 상당히 설계가 잘 되어서, 믿고 쓸 수 있는 미들웨어 중의 하나입니다. 만일 Kafka를 이용하여 분산 로그 인프라를 구축할 계획이라면 Kafka Streams를 사용하는 것도 고려해보시면 어떨까 합니다.

관심 있으신 분들을 위해 참고 문서 몇 가지를 마련해봤습니다.

Related Post