内部データパイプラインへのKafka Streamsの適用

Kafka Streamsのご紹介

こんにちは。LINEでサーバ開発エンジニアとして働いているYuto Kawamuraです。主にHBase、KafkaといったLINEの中核的なストレージを開発・運営しています。

昨年下期からは、IMF(Internal Message FlowまたはFund)と呼ばれる新規プロジェクトも担当しています。このIMFプロジェクトの目的は大きく2つあります。

  • 内部システム間のevent deliveryを統一された方法で行うデータパイプラインの開発
  • LINEサーバシステムにおけるバックグラウンド処理を担当するコンポーネントの一つであるtalk-dispatcherの置き換え

この2つの目的は互いに関連性がないようにみえますが、これらの目的を達成するために、Apache Kafkaとストリームプロセッシング技術を適用することを考えています。Apache Kafkaは、LinkedInによって開発され、使われてきた大容量の分散メッセージングシステムです。ユニークな機能を多数提供していますが、一番重要な特徴は次のとおりです。

  • ディスクベースの永続化を提供すると同時に、ページキャッシュを活用してインメモリに引けをとらない高いスループットを実現しています。
  • 複数のconsumerが一つのtopic(queueのような概念)から複数回メッセージを取得できます。このようなやり方が可能なのは、クライアントがそのqueueからどこまでデータを取得したか、その位置を知らせてくれる「offset」を管理しているためです。

Kafkaエンジニアリングの基本やIMFプロジェクトのコンセプトアイデアなど、面白いテーマはたくさんありますが、今回はストリームプロセッシングの実装方法にフォーカスしてご紹介します。

ストリームプロセッシングフレームワーク

ストリームプロセッシングには、Apache Storm、Apache Spark、Apache Flink、Apache Samzaなど広く使われているフレームワークが複数あります。ここで、最初に採用したのはApache Smazaでした。
SamzaはKafkaと同様、LinkedInで開発されました。Kafkaと連携するように設計されているため、Kafkaとの統合に標準対応しています。基本的にはよく動作しましたが、サービスに直接的に影響を与え得るコアインフラを構築することを考えると、いくつかの懸念点がありました。

  • SamzaはYARNと連携するように設計されています。YARNはとてもうまく作られている分散リソース割当フレームワークであり、広く使われていますが、今回の使途には適さないと考えれらるいくつかの点がありました。
    • 当初バッチ処理のために設計されたものであり、ストリームプロセッシングは可能ではありますが、Hadoopを継承した一部の部分がストリームプロセッシングに適していないと感じました。
    • LINEのデプロイシステムであるPMC(Project Management Console. LINEの主要サービスを管理するために使用するツール。CMDB(configuration management database)のビルド機能と配布機能を合わせたサービス)との親和性がありません。
    • 全体のアーキテクチャをシンプルに保つことを意識した上で、今回のケースにおいてはYARNの主要な機能であるリソースのアイソレーションや割り当てといった機能は必要ありませんでした。その理由は次のとおりです。
      • サーバは、基本的にサービスごとに別途割り当てられます。
      • 物理メモリを消費するのはほとんどがheapですが、JVMはheapサイズの上限を制限するオプションを持っています。
      • アプリケーションの特性上、CPUは問題になりません。
      • ネットワーク問題があることは想定されますが、YARNにはネットワークI/Oを制限する機能がありません。
  • 社内のエンジニアリング設備は、「host」の概念に徹底的に従っています。例えば、サービスのモニタリングに使用する独自開発ツールであるIMONは、hostごとのメトリクスを確認したりアラームを送信したりすることができ、Kibanaはhostごとのログを保存します。そのため、jobを実行するhostの決定をYARNに任せるには、さらなる作業が必要でした。
  • Apache SamzaのDevelopment activityはあまり活発ではないように見えました。

言うまでもなく、YARNはリソースプール上で多くのjobを実行するためには有効です。現在、統計用jobや調査用に実行されるアドホックなjobの実行に用いられています。

Kafka Streams

2016年3月10日、Confluent社(LinkedInでApache Kafkaを開発した人々が設立した会社)が、 「Introducing Kafka Streams: Stream Processing Made Simple」というタイトルのブログ記事を投稿しました。この記事を読んで、Kafka Streamsは我々が求めていたものに限りなく近いと思いました(この記事を読むまでは、独自の実装を開発しようかとも思っていました)。他の一般的なストリームプロセッシングフレームワークが「実行フレームワーク」であるのに対し、Kafka Streamsは「ライブラリ」です。Apache Samzaから継承した概念も一部ありますが、重要な差があります。

  • ただのライブラリであり、実行フレームワークではないため、ユーザーが手動で実行させる必要があります。特定のフレームワーク上で実行するか、またはpublic static void main()を使うかは、開発者に委ねられています。
  • Kafkaのプリミティブな機能を活用し、コア機能を最小のコードでシンプルに実装しています。
  • シンプルなDSLを使用してプロセッシングトポロジーを定義できます。
  • Kafkaのコミュニティが開発をしているので、非常に活発な開発活動を期待できます。
  • Rolling restartに対応しています。この特徴は、単一のインスタンスのみをアップデートした上で、プロダクションのトラフィックを受けながら動作確認するためにも便利です。

Kafka Streamsは、Kafkaバージョン0.10.0で公開されました。私たちが初めてKafka Streamsを試してみたのはまだリリース前のときだったので、ソースリポジトリから自前でartifactをビルドする必要がありました。なお、Kafka Streamsは、バージョン0.10.0.0以上のKafka brokerを必要としますが、現在使用しているクラスターのバージョンは0.9.0.1です。従って、互換性のないプロトコルをダウングレードするために、クライアントライブラリを手動でパッチするなどの多少筋が悪い作業が必要でした(もちろん、新しいバージョンがリリースされる次第、クラスターをアップグレードする予定です)。それでも、新たに実装を自前で作るよりは簡単な作業だっだと考えています。

次は、Kafka Streamsが提供する特徴的な機能について説明します。

Masterless

Kafka Streamsは、障害検知、処理ノード間のコーディネーション、パーティションの割り当てなどを行うために通常の分散システムに存在するマスターという概念がありません。その代わり、Kafka独自のコーディネーションメカニズムに全面的に依存しています。つまり、worker間の直接的な通信はありません。KafkaStreamsのインスタンスを作成すると、与えられたapplicationIdに対するconsumerの一つとして、Kafka brokerにsubscribeします。リバランスが必要な場合、またはfailoverが発生した場合は、Kafkaのbrokerがそれを検知して処理するため、worker間の通信は不要です。

High-level-DSL APIとLow-level API

Kafka Streamsは、ストリームプロセッシングのプログラミングのためにHigh-level-DSLとLow-level APIの2つのAPIに対応しています。

High-level-DSL

多くの場合、ストリームプロセッシングは、ストリームにtransform、filter、join、aggregateなどの処理を適用し、その結果を保存するといった流れになります。このように基本的な演算処理を行うには、High-level-DSLインターフェースが適しています。High-level-DSLを使用すれば、Scala Collections APIとかなり類似した形でcollection、transformなどの演算をプログラミングできます。以下は、IMFプロジェクトでloopback topic replicatorを使用した例です(loop topic replicatorの用途は、特定の用途に合わせてオリジナルのtopicのメッセージをフィルタリングし、新しいtopicに保存することです)。

KStream<Long, OperationLog> stream =
     builder.stream(sourceTopic.keySerde(), sourceTopic.valSerde(), sourceTopic.name());
 
Map<String, Set<OpType>> categories = loadCategories();
for (Map.Entry<String, Set<OpType>> entry : categories.entrySet()) {
    String topic = entry.getKey();
    Set<OpType> opTypes = entry.getValue();
    TalkOperationLogV1 destTopic =
            InternalMessageTopics.getTopic(topic, TalkOperationLogV1.class);
 
    stream.filter((key, value) -> {
              TalkOperation op = value.getOperation();
              return op != null && opTypes.contains(op.getOpType());
          })
          .to(destTopic.keySerde(), destTopic.valSerde(), destTopic.name());
}
Properties props = loadStreamsProps();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

目的は、オリジナルのtopicから派生したtopicをopTypeカテゴリ別に構成することです。これは次のように、直感的な数行のコードで表現できます。1) topicとKey-value serializerを指定してKStreamを作成る。 2) 各要素にフィルターを適用する。 3) 結果を該当するカテゴリのtopicに保存する。

     builder.stream(sourceTopic.keySerde(), sourceTopic.valSerde(), sourceTopic.name());
 
...
 
stream.filter((key, value) -> {
          TalkOperation op = value.getOperation();
          return op != null && opTypes.contains(op.getOpType());
      })
      .to(destTopic.keySerde(), destTopic.valSerde(), destTopic.name()); 
      // Store the resulting messages to the topic pointed by destTopic

Low-level API

一般的ではない処理方式を要するいくつかの稀なケース(メッセージを内容別に特定のダウンストリームに送信するなど)は、Low-level APIを使用すべきです。Processor APIは大体直感的なので、特に分かりにくいところはないでしょう。Low-level APIが実際に適用された事例を知りたい場合は、公式リポジトリの「examples」ディレクトリをご確認ください。基本的にはHigh-level-DSL APIの使用が推奨されますが、必要に応じてLow-level APIを使用しても問題ありません。

Fault-tolerance local state DB

ストリームプロセッシングを実装する過程では、多様な目的のためにstateを保存しておく必要があります。ローカルstateは、一般にaggregation、join、windowingを実装する際に使用しますが、他の使い方もあります。Kafka Streamsでは、各プロセッサが固有のstate storeを持ちます。

Kafka Streamsのchangelog機能のお陰で、障害が発生してプロセッサが他のhostにfailoverされれば、state DBも新しいプロセッサに移動できます。Kafka Streamsがstate DBのための物理store(pluggableなので、インメモリstoreやRocksDBなどでいくらでも置き換えられる)をアップデートする間、「changelog」という特別なtopicのためのメッセージを生成します。Changelog topicは、ローカルstateのWAL(Write-Ahead-Log)とみなすことができます。Kafkaのtopicは何回でもconsumeできるので、プロセッサにfailoverが発生するたびに新しいプロセッサがchangelog topicからmutation logをリプレイし、ローカルstate DBを復元できるようになるわけです。つまり、プロセッサstateを保存するために外部ストレージを使用する必要がないということです。KafkaとKafka Streamsだけで完結することができます。

Kafka Streamsを利用して実装したもの

Loopback replicator

Kafka Streamsを用いて、Kafka topic replicatorを実装しました。これは、単純にクラスター間でtopicを複製するためのものではなく、マップ/フィルターなどの演算をメッセージに適用して派生topicを生成することに使います。主に、オリジナルのtopicからメッセージを分類して容量の少ない派生topicを提供し、consumerにより少量のメッセージを消費させることで、ネットワークトラフィックとリソース使用量を減らすために多く使います。
例として、TalkOperationログを挙げます。TalkOperationはLINEクライアントとtalk-server間のコミュニケーションに用いられるデータ構造です。ピークタイムにおいては、topicに送られるメッセージ数が1秒当たり100万件に達します。一部のconsumerはすべてのデータを必要としますが、一部のデータのみを必要とするconsumerもあります。
例えば、あるconsumerがLINEのコンタクト機能に関するデータ(ADD_CONTACT、BLOCK_CONTACTなど)にだけ関与したいのであれば、ストリーム全体を消費する必要はありません。その場合は、このloopback replicatorを使用して、コンタクト機能に関する演算だけを持つ派生topicを提供します。現在のところ、loopback replicatorは特別な機能のない単体のJavaアプリケーションとしてデプロイされていますが、これまで問題が発生したことはありませんでした。テストのために人為的なfailoverを数回発生させましたが、問題なく動作しました。

Decaton

Decatonは、冒頭で触れたtalk-dispatcherを置き換えるためのバックグラウンド処理システムです。既存のtalk-dispatcherには次のような問題点がありました。

  • 処理がスケールしない。 Talk-serverで作成されたすべてのタスクは、同じhost内で動作しているローカルのRedisのqueueに登録されます。Consumerであるtalk-dispatcherも同じサーバで駆動し、ローカルのRedisインスタンスにあるタスクのみ処理します。タスクを読み込んで処理するインスタンスは一つしかないため、サーバにburstが発生するとqueueのサイズが非常に大きくなります。
  • インメモリqueue。 queueサーバとしてRedisを使用しているため、インスタンスがなんらかの理由で終了すれば、queueの内容もすべて失われます。また、物理メモリの限界のため、queueに登録できるタスク数も大きく制限されます。このような限界があるため、大量のタスクを失ってしまった経験が過去に何回かありました。
  • Out-of-order処理。 私たちはuserIdを基にリクエストをルーティングしないので、UserAが送ったリクエストははどのtalk-serverでも受ける可能性があります。同じユーザに対する複数のタスクが、同時に別々のhostのqueueに発生し得ます。このようなタスクは、それぞれ違うtalk-dispatcherのインスタンスによって消費されるため、処理順と登録順は一致しない可能性があります。

Kafkaのtopicをqueueとして用いることにより、以下の機能を獲得できます。1) 拡張性のある分割されたqueue、2) 高速でディスクベースの永続的なqueue、3) メッセージkey shufflingを利用したkey毎の逐次処理保証(場合によって異なりますが、ここではuserIdをkeyとして使用することを例として挙げています)。

上図は、Decatonの動作を簡単に表現したものです。上述のとおり、これはtalk-dispatcherの3つの大きな問題点を解決する一方で、プロセッサ間のアイソレーションを保つようにも設計されています。Kafkaのtopicは不揮発性です。この特性のお陰で、同じタスクに対しそれぞれ違う処理を実行する各プロセッサが独立してタスクを消費できます。処理の実行を隔離することで、同じconsumerコンテキスト内で関連のないタスクを処理する途中で発生したストレージ呼び出しの遅延または失敗などによって、処理がブロックされることを防止します。

上図の例だと、TaskProcessorAとTaskProcessorBは、HBaseサーバに障害が発生して処理が無期限ブロックされる場合にも、StorageMutationProcessorから影響を受けません。タスクがqueueに積まれることになりますが、Kafkaのtopicが永続的でサイズに制限がない(ディスクベースなので)ので、それも問題になりません。

Decatonは、任意のtopicにメッセージを割り当てる機能を提供するためにLow-level APIを利用した事例の一つです。これを実現するためのAPI改善がKAFKA-3497において行われました。

Kafkaへの貢献

Kafka Streamsを利用してソフトウェアを開発する中で、いくつかのバグや改善事項を発見したため、issueやパッチのコントリビューションを行いました。

Kafka Streamsはまだ開発の初期段階なので、ユーザーの意見にかなり柔軟に対応してくれます。Confluent社のエンジニアたちのサポートを受けているコミュニティなので、対応もとても早く、コントリビュータとしては気軽に参加できます。Kafkaコミュニティは完全にオープンされているため、一部のメンバーによってのみ行われる非公開議論などもありません(少なくとも自分の知る限りはですね。

このような新規開発のソフトウェアに貢献することは大きな意味があります。そのソフトウェアの開発方向性に直接的に関わることができるからです。コミュニティと力を合わせて、自分に必要なものを開発できるということは、楽しい作業だと思います。

終わりに

Kafka Streamsは、上述したように、まだ開発の初期段階のものですが、今後の発展に大きく期待できます。サービスのパフォーマンスや信頼性に直接的に影響するコアアプリケーションとの連携を考えてデザインされた、初のストリームプロセッシングフレームワークであるからです。Kafka自体も良く設計されているので、信頼して利用できるミドルウェアの一つだといえます。もしKafkaを利用して分散ログのインフラを構築する計画であれば、Kafka Streamsの使用も考慮してみてはいかがでしょうか。

興味がある方々のために、参考文書をいくつか記載しておきます。

Related Post