LINE株式会社は、2023年10月1日にLINEヤフー株式会社になりました。LINEヤフー株式会社の新しいブログはこちらです。 LINEヤフー Tech Blog

HBaseとKafkaによるデータパイプライン構築。LINE Messaging Platformにおける活用法

2021年11月10日・11日の2日間にわたり、LINEのオンライン技術カンファレンス「LINE DEVELOPER DAY 2021」が開催されました。特別連載企画「 DEVDAY21 +Interview」では、登壇者たちに発表内容をさらに深堀り、発表では触れられなかった関連の内容や裏話についてインタビューします。今回の対象セッションは「LINE Messaging Platform におけるHBaseとKafkaのデータパイプラインと活用例」です。

LINEでは、Messaging Platformのストレージミドルウェアの1つとしてApache HBase(以下、HBase)を使用しています。HBaseのレプリケーション機能は、データ永続性のためのWAL(Write-Ahead Log:ログ先行書き込み)を処理するコンポーネントとして実装されており、プラガブルになっています。

私たちはこの機能を活用して、WALをApache Kafka(以下、Kafka)に送信するための独自のレプリケーション機能を実装し、データパイプラインを構築しました。そして、この数年間でデータパイプラインを活用したWAL駆動のアプリケーションをいくつも開発してきました。今回はパイプラインの概要や構築までの道のり、アーキテクチャの工夫などをZ Partチーム HBaseユニットの鶴原翔夢と吉田真也が解説します。

左から吉田さん、鶴原さん

データパイプラインによってレプリケーションの課題を解決

――HBaseユニットの役割とお2人の自己紹介をお聞かせください。

吉田:HBaseユニットは、LINE Messaging Platformのサーバーサイドで用いられているHBaseクラスタの構築や運用をしています。また、HBaseの各種パッチの開発やアップストリームのパッチにも貢献しています。

また、HBaseクラスタの運用だけではなく、サーバーサイドアプリケーション内におけるHBaseへのデータアクセスロジックやストレージ実装なども開発しています。HBaseクラスタのためのHadoopクラスタやZookeeperクラスタの構築や運用も行っています。

私はこのチームでサーバーサイドエンジニアとして働いており、先日に開催された「LINE DEVELOPER DAY 2021」では、HBaseとKafkaのデータパイプライン構築の事例についてお話ししました。

鶴原:サーバーサイドエンジニアの鶴原翔夢です。吉田さんと同じチームで働いています。

――HBaseとKafkaでデータパイプラインを作成した経緯についても教えてください。

吉田:私たちがデータパイプラインを構築したのは2017年です。当時は、本番環境のクラスタにHBaseのバージョン0.90.6を使用していました。そして、別チームが管理しているstatsクラスタと呼ばれるHBaseクラスタに、統計解析のためのデータをレプリケーションしていました。statsクラスタのHBaseのバージョンは0.94でした。

私たちが用いていたバージョン0.90.6のリリースは2012年と古かったため、2017年にはすでにコミュニティからのサポートが受けられない状態になっていました。そこで、2017年にリリースされたバージョン1.2.5へのマイグレーションプロジェクトを進めていました。

――どのような方法でマイグレーションを実施しましたか?

吉田:最初はバージョン1.2.5で構成される新しいクラスタを作り、アプリケーションサーバーから古いクラスタと新しいクラスタの両方に書き込みをします。そして、古いクラスタから新しいクラスタにデータのコピーを行うことによって、マイグレーションを進行しました。

古いクラスタへの書き込みを止めて統計解析を引き続き行うためには、新しいクラスタからstatsクラスタへのレプリケーションをセットアップする必要があります。しかし、バージョン1.2.5のレプリケーション機能はバージョン0.94へのレプリケーションをサポートしていませんでした。

――なぜでしょうか?

吉田:2013年にリリースされたバージョン0.96は、HBaseのコミュニティでSingularity(特異点)と呼ばれており、通信プロトコルの大幅変更が行われています。また、2015年にリリースされたバージョン1.0ではAPIのクリーンアップが行われています。つまり非互換変更が入っていたため、バージョン1.2.5からバージョン0.94へのレプリケーションができなかったんです。そこで私たちはHBaseとKafkaのデータパイプラインを構築することで、この問題を解決しました。

HBaseの機能を有効活用してアーキテクチャを構築

――データパイプライン構築は一筋縄では行かないように思います。

吉田:確かに難易度の高いプロジェクトでした。この事例では、HBaseのPluggable Replication Endpointという機能を活用し、パイプライン構築に役立てることにしました。

HBaseにはクラスタ間のレプリケーションの機能が標準で備わっています。仮に、SourceクラスタからDestinationクラスタへのレプリケーションをセットアップした場合、各RegionサーバーではReplication Sourceと呼ばれるスレッドが起動します。

Replication Sourceは、HDFS上のWALファイルを読み込み、WALの各エントリをReplication Endpointに渡します。HBaseではこのReplication Endpointがプラガブルな形式で提供されており、機能拡張が可能になっています。

私たちはこの機能を用いて、独自に定義したReplication EndpointがWALのエントリを独自のプロトコルでKafkaに送信するようにしました。KafkaからWALのエントリのデータをコンシュームしたリプレイヤーは、0.96のHBaseクライアントとプロトコルを用いて、statsクラスタへと書き込みを行い、バージョン1.2.5へのマイグレーションを実現しました。

――この事例のように、HBaseの特定のクラスを継承して独自の定義をすることで、各種コンポーネントの挙動を変えているケースは他にもありますか?

吉田:HBaseには、書き込みや読み込みのリクエストを受け取った際に追加の処理を挟み込める、コプロセッサという機能があります。私たちはその機能を拡張して、わざと処理を失敗させるコンポーネントを作り、テスト用の環境でカオスエンジニアリングを実施するために用いています。

また、開発環境でのデータの不整合の調査のためにもコプロセッサを活用しています。Messaging PlatformではHBaseとRedisにデータを保存しているのですが、リトライ処理のバグなどによりこれらの間で不整合が発生してしまうことがあります。その際、アプリケーションのログなどを元になぜこの不整合が起きたのかを調査する必要があります。その調査を簡単に行うためにコプロセッサを使って、HBaseにデータを送信したクライアントの情報、例えばホスト名やどのクラス、メソッドから送られたか、実際の送信日時などをデータのメタ情報として保存します。

HBaseのクライアントサイドでは、HBaseクラスタとの通信を行うクラスを拡張し、RPC(Remote Procedure Call)をインターセプトするというライブラリを開発しています。このライブラリを用いることでHBaseのRPCの前後に様々な処理を追加できるようになります。実際の活用例としては、HBaseの性能評価のために、HBaseクラスタへの読み込みや書き込みのリクエストをKafkaに送信しています。HBaseの新しいバージョンや新機能などの調査のために、テスト用のHBaseクラスタに対して負荷を掛けて機能や性能などを評価することがあります。その際にベンチマークツールによるワークロードでの評価だけではなく、Kafkaに送信された読み込みや書き込みのリクエストをテストクラスタに対して送信することで、実際のサービスのワークロードを再現して評価しています。

鶴原:特定のインターフェースを継承したクラスを実装することで処理を差し替え可能な機能は、Hadoopのコードベースに多い印象があります。HBaseはHadoopから派生したプロジェクトですから、その流れを汲んでいるのだと思います。

――データパイプライン構築において工夫した点をより深く伺いたいです。

鶴原:このプロジェクトでは、バージョン1.2.5からバージョン0.94へと、データを漏れなくレプリケーションすることが最も大切でした。HBaseは分散データベースであり、Kafkaも分散処理のためのミドルウェアですから、複数台のサーバーによる運用が前提となっています。いずれかのサーバーに障害が生じたとしても、絶対にデータを欠損させるわけにはいきません。このデータパイプラインを構築した後、かなり入念にテストをしました。

吉田:仮にサーバーが落ちてもデータを失うことなくレプリケーションできるか試すため、自分たちでテスト用のKafkaクラスタを構築して、処理の途中で特定のKafkaサーバーを落とす方法で動作検証しました。ちなみに余談ですが、私がLINEに社員として入社したのは2018年で、2017年当時は新卒内定者としてアルバイトをしていました。鶴原さんにメンタリングしてもらいつつ、このデータパイプライン構築のプロジェクトに携わりました。

鶴原:懐かしい。あの頃はまだ吉田さんがアルバイトでしたね。

吉田:良い機会なので鶴原さんに質問したいのですが、データのやりとりをする際にKafkaを経由させるアーキテクチャを選んだのはどうしてですか?

鶴原:システムアーキテクチャ構築において、処理をバッファリングしてくれる中間層を挟むと、システムの柔軟性や耐障害性などを向上させることができるんです。

HBaseからHBaseに直接レプリケーションするアーキテクチャにすると、データ書き込み先のHBaseが不安定になった場合に、書き込み元のHBaseも影響を受けてしまうケースがあります。Kafkaを経由することで、こうした障害の影響を緩和できます。

それから仮に、あるデータセンターから距離の遠い場所にあるデータセンターへとレプリケーションする場合、HBaseからHBaseへと書き込みのリクエストをやり取りすると通信のレイテンシーの影響を受けてしまいます。中間にKafkaを挟むことで、リクエストをバッファリングできるため、レイテンシーの影響を抑えられます。

それから、 Kafkaのコンシューマアプリケーション内で任意の処理を実行することで、リアルタイムにデータ分析などが実施可能になるという利点もあります。そうした理由からKafkaを用いました。

今後はさらにデータパイプライン活用の幅を広げていく

――データパイプラインを構成する各要素についてもご説明ください。

吉田:HBase0.94のクラスタへのリプレイヤーが1.2.5のクライアントに依存しないように、Kafkaに送信するデータのプロトコルはProtocol Buffersを用いて独自に定義しています。定義したデータ構造は、WALのエントリやHLogKeyといったWALのメタ情報などです。それに加えて、HBaseの変更の単位であるCellというデータ構造を定義しました。これらはHBase1.2.5で用いられているプロトコルと同様のデータ構造です。

KafkaにWALのエントリを送信するためのReplication Endpointは独自に定義し、Kafkaの送信先であるトピックはテーブル名とprefix、suffixを用いて決定します。Kafkaに送信するためのkeyにはリージョンの識別子であるEncoded region nameやCellごとのRowkeyなどを指定し、valueとしてデータを先ほど定義したプロトコルに変換します。

KafkaのReplication Endpointのセットアップは、HBase Shellとadd_peerコマンドを用いて行います。Endpointのクラス名を指定し、Kafkaの接続情報やクライアントの情報、そしてトピック名のprefixやsuffixを指定します。

HBase0.94のクラスタにデータを送信するリプレイヤーは、KafkaからWALのデータをコンシュームし、そのWALのエントリを0.94のmutationへと変換を行い、0.94のライブラリを用いて指定されたクラスタに書き込みます。

私たちが構築したHBaseとKafkaのデータパイプラインは、一般的には変更データキャプチャという名称で知られるアーキテクチャです。このパイプラインによってデータベースの変更に基づく処理を簡単に実現でき、かつ信頼性の非常に高いデータ連携を行えます。

一方で、留意する点として、非同期で処理が行われるため必然的にデータ反映の遅延が生じる可能性があります。また、あるデータの変更が発生したタイミングで他のデータやカラムに対してアクセスを行うためには、コンシューマー側でのアグリゲーションやデータベースへの適宜のアクセスを実施する必要があります。

――詳しい解説をありがとうございました。最後に今後の予定についても教えてください。

吉田:現在、社内では複数のサービスがこのデータパイプラインを活用しています。今後も適用範囲をさらに広げていきたいです。今後検討されている活用例として、「LINE DEVELOPER DAY 2021」ではSecondary indexとIncremental backupについてお話ししましたので、よろしければ詳細はセッションの動画をご覧ください。

鶴原:吉田さんが話してくれたように、今後もLINE社内でデータパイプラインを活用する場所をもっと拡充していきたいですね。それから、この事例で挙げた設計手法は利点が非常に大きいことがわかったので、今後はHBase以外のデータベースやミドルウェアなどでも、パイプライン構築に挑戦してみたいです。

採用情報

LINE株式会社では一緒に働くエンジニアを募集しています! 
今回のインタビューと関連する募集ポジションはこちらです。