LINE株式会社は、2023年10月1日にLINEヤフー株式会社になりました。LINEヤフー株式会社の新しいブログはこちらです。 LINEヤフー Tech Blog

Blog


Kafkaを利用したジョブキューライブラリ「Decaton」の活用事例

先日 LINE が開発するライブラリ Decaton が OSS として公開されました。
Decaton は Kafka を利用したジョブキューライブラリで、LINE 社内で幅広く利用されています。
GitHub - line/decaton: High throughput asynchronous task processing on Apache Kafka

今回の記事では、LINE で Decaton がどのように利用されているか、実際に利用されているプロダクトの実例を交えて紹介します。

Decaton とは?

Decaton は LINE 社内で非同期処理を行う際のジョブキューとして利用されているライブラリで、データストアとして Kafka を利用しています。

Kafka には、ストリーム処理を扱う公式のライブラリとして Kafka Streams があります。しかし、Kafka Streams では当時の LINE 社内の要求を全て満たすことができず、別途 Kafka を用いたライブラリである Decaton を開発することになりました。

Decaton は Kafka Streams に比べ効率的にメッセージを処理でき、プログラム自体もシンプルに構成できます。
たとえば、Decaton では一つの Kafka パーティションを複数のスレッドで同時に処理する機能があります。この機能は、非常に大量のメッセージを処理しなければいけないアプリケーションにとって、大変有用です。

次からは、Decaton のこれらの機能が実際にどのように生かされているか、実例を用いて順に説明していきたいと思います。

導入したプロジェクトの概要

今回ご紹介するプロジェクトはスマートチャンネルです。
みなさんが LINE でメッセージのやり取りをする際、トークタブの一番上に天気予報やニュースなどのコンテンツが表示されているのを見かけた方がいるかも知れません。それがスマートチャンネルです。

スマートチャンネルは、ユーザー一人ひとりに最適なコンテンツを配信するシステムです。これらのコンテンツは、ユーザーのリクエストが来た時、コンテンツの候補の中からリアルタイムに計算し、最適なものが選ばれています。

スマートチャンネルは、日本・台湾・タイでLINEを利用する全ユーザーを対象とした、非常に高トラフィックなサービスです。そのため、一部のジョブを非同期に実行したり、イベントログを集計するために Decaton を使っています。

スマートチャンネルの成り立ちについてより詳しく解説された記事はこちらです。
【Product Story #3】ユーザー調査とテストを徹底的に繰り返し、反対派も巻き込みローンチに至った「スマートチャンネル」開発プロジェクトの裏側 - LINE ENGINEERING

シンプルなジョブキューとしての Decaton 利用例

Decaton を利用すると、Kafka をバックエンドとしたジョブキューをシンプルに実現できます。スマートチャンネルでは、配信するコンテンツを更新するためのジョブ実行に Decaton が利用されています。

スマートチャンネルでは、ユーザーへ配信するコンテンツをあらかじめバッチで取り込んでいます。その後、取り込んだコンテンツへ更新が発生したときに Decaton を利用しています。

スマートチャンネルではニュースや天気予報などを配信しています。
これらのコンテンツは、LINE NEWS などスマートチャンネルに連携する各サービスから API で連携し取り込まれています。

コンテンツの更新があると、スマートチャンネル側の REST API が叩かれます。
この API では、Decaton のタスクとして Kafka へ書き込みます。実際の更新処理は行いません。

実際の更新を行っているのは、別に用意されたワーカープロセスです。
ワーカーでは、Kafka に保存された Decaton のタスクを取得し、実際の更新処理を行います。

このように、REST API を呼び出された時点で処理するのではなく、Decaton のタスクとして保存することにはいくつかの利点があります。

  • 一時的なアクセス集中による高負荷を回避可能
  • ジョブのリトライ処理を Decaton が行うため、追加の実装が不要

まず、更新ジョブを即時に処理するわけではないため、一時的に更新ジョブが増えた場合でも高負荷になりにくい構造です。更新を行う REST API 自体がダウンし、更新自体を受け付けられなくなる最悪の自体を防げます。

また、Decaton を用いるとジョブのリトライが容易です。
Kafka はメッセージの配送のみに責任を持つため、ジョブのリトライ処理を行いたい場合は自前で実装する必要があります。しかし Decaton を使うと、リトライ処理をわざわざ実装する必要が無くなります。
Decaton はリトライ用の Kafka topic を別途持ち、それを用いてリトライになったタスクを管理しますが、ライブラリ利用者はあまり意識すること無く一つの Kafka topic を処理するようプログラムを書けるようになっています。

さらに、素のままの Kafka を使うより Decaton を用いたほうが、より効率的にジョブを処理できます。Decaton を使うと、単一の Kafka パーティションを複数スレッドで並列に処理できるようになるためです。

以上のように、スマートチャンネルではジョブキューとして Decaton を利用したことにより、信頼性の高いプログラムを見通しよく作ることができました。

Decaton の遅延タスクを利用したイベントログの集計

Decaton には、ある処理を指定時間遅延させた後に実行する、遅延タスクの機能があります。
スマートチャンネルでは、この機能を用いてコンテンツに対するクリックなどの各種イベントの集計を行っています。

スマートチャンネルでは、表示したコンテンツに対していろいろなイベントを取得しています。その中でも主要なイベントが3つあります。インプレッション (コンテンツの表示)、クリック、ミュート (コンテンツのクローズ) です。これらのイベント情報を使って、どのコンテンツを出すのが最適なのかどうか、学習を行っています。

これらのイベントを元に学習する為に、表示したコンテンツが最終的にどのような結果を生んだかを把握する必要があります。

スマートチャンネルでは、コンテンツ表示後のユーザーの主な動作として、以下の 3 つがあると考えています。

  1. コンテンツを表示 → コンテンツをクリック
  2. コンテンツを表示 → コンテンツをミュート (クローズ)
  3. コンテンツを表示 → 何もしない

1, 2 の動作に関しては、ユーザーが具体的なアクションを行うので、そのイベントを取得すれば把握できます。
しかし 3 に関しては、何もしないというイベントは発生しないため、どのような場合が 3 に該当するか、明示的にこちらで定義する必要があります。
今回は、コンテンツ表示後 10 分間に何もアクションがなければ 3 に該当すると定義することにしました。

これらのイベントを処理する方法として、イベントが来たときにそれをどこかへ保存し、後から取り出して利用するという方法が考えられます。しかし、ただイベントを保存するだけでは 3 の判定を容易に行うことが出来ません。保存しなければいけないデータも膨大です。

そこで、スマートチャンネルでは、この 3 の判定を容易に実装するため、Decaton にある遅延タスクという機能を活用しています。

以下は、Decaton を用いたイベント集計のアーキテクチャです。

まず、スマートチャンネルではユーザーのイベントが発生した際、HTTPS のリクエストで通知する構造になっています。その API では、イベントの発生を受け取ると、まず一度 Kafka へ書き出します。

その後は、イベントに応じた各種ワーカーで処理が行われます。
クリックやミュートのイベントは、これらのイベントが発生したことを Redis へ保存します。
インプレッションのイベントは、10 分後に処理する Decaton の遅延タスクとして、再度 Kafka へ格納されます。

処理を遅延したインプレッションのイベントは、10分経過後にそのインプレッションが 1, 2, 3 のどれに該当するか判定します。クリックやミュートのイベントには固有の ID が割り振られており、インプレッションのイベント情報から Redis に格納されたクリックやミュートのイベント情報を取得することができるようになっています。
もし、この時にクリックやミュートのイベントが発生していない場合、3 であると判断することができます。

実際のコードでは、以下のように Decaton のタスクを生成する時にどれだけ遅延して実行させるかを指定しています。

long timestamp = clock.millis(); // Get current UNIX timestamp in milliseconds
Duration delay = Duration.ofMinutes(10L); // Run the task after 10 minutes
TaskMetadata metadata =
        TaskMetadata.newBuilder()
                    .setTimestampMillis(timestamp)
                    .setScheduledTimeMillis(timestamp + delay.toMillis());
                    .build()
 
 
Task task = new Task(); // Task is a class generated by protobuf.
Serializer<Task> serializer = new ProtocolBuffersSerializer<>();
DecatonTaskRequest taskRequest =
        DecatonTaskRequest.newBuilder()
                          .setMetadata(metadata)
                          .setSerializedTask(ByteString.copyFrom(serializer.serialize(task)))
                          .build();
 
 
// After creating a Decaton task, submit it to Kafka topic.
producer.send(new ProducerRecord<>("topic", "key", taskRequest));

実は、当初は今回紹介した処理を Kafka Streams を使って実装しようとしていました。しかし、このような処理を Kafka Streams で効率的に実装するのが難しいことが分かりました。

その後、Decaton と Redis を組み合わせた今回の手法を考え試した所、シンプルなプログラムを組み合わせるだけで目的を達成できることが分かり、結果的にこの手法が取られています。

Decaton の遅延タスクの機能は、今回紹介したイベント集計以外にも、様々なことに利用できる便利な機能です。
利用する機会があれば、是非活用してみてください。

まとめ

今回の記事では、スマートチャンネルでの Decaton の活用事例を 2 つを紹介させていただきました。
スマートチャンネルでは Kafka Streams を使っている箇所も多いのですが、Decaton に置き換え可能な箇所は順次置き換えていこうと思っています。それにより、簡潔なリトライ処理や、単一パーティションの複数スレッド並列処理など、Decaton の恩恵を受けることが期待できます。

Decaton は大変便利なライブラリで、スマートチャンネルというサービスを提供する上で無くてはならないものです。幅広く活用できるライブラリだと思いますので、機会があれば OSS として公開された Decaton をぜひ試してみてください。
GitHub - line/decaton: High throughput asynchronous task processing on Apache Kafka

今後とも、Decaton とスマートチャンネルをどうぞよろしくお願い致します。

採用情報