LINE株式会社は、2023年10月1日にLINEヤフー株式会社になりました。LINEヤフー株式会社の新しいブログはこちらです。 LINEヤフー Tech Blog

Blog


「カジュアル」な規模のデータクラスター上でのデータ解析処理

今年はさだまさしさんのデビュー40周年ということで、記念コンサート「さだまつり」も絶賛開催中の折も折、残暑も厳しい中皆様いかがお過ごしでしょうか。大平です。

さだまさし氏は経験の豊富な方ですので彼の歌や発言から学ぶことは大変多いのですが、個人的に非常に感銘を受けているのは「歌はコンサートで成長する」という言葉です。歌い手として、「歌」という作品を作って公開・販売するだけが仕事ではなく、実際にコンサートなどでお客さんに届け、お客さんの反応を参考にしたり日々の演奏活動の中で試行錯誤を繰り返して、内容をブラッシュアップし洗練させていく過程を指して先の言葉があるのだと思います。実際にさだまさし氏の曲はCDに収録されているものと実際にライブで演奏されるものとでアレンジが大きく異なり、かつ作品としても質が向上しているものが少なからず存在します。

…あまりさだまさしの話を続けると本当に上長に叱られますので、、まあ何が言いたいかというと、サービスも同じで、機能を開発して終わりではなく、いかに機能を洗練させ、安定して稼働させるようにするか、運用やチューニングの作業は非常に大事です。
この記事では、あるデータ解析基盤におけるチューニングの過程についてお話できればと思います。

LINEサービスの統計解析基盤

現在私は、LINEの統計解析基盤の開発を担当しています。もっとも基盤自体は一年以上前から運用されており、私は直近の作業としてはここ1・2ヶ月ほどは主に基盤のデータ構造の見直しとチューニングを担当しておりました。

LINEの統計解析基盤は、多くの会社にて同様のシステムは存在すると思いますが、LINEのログデータやRDBのデータなどを使用して、主にKPIの参考になる様々な統計的な数値を算出し表示するシステムです。他にサービス運用のための補助的な機能などもいくつか存在します。

システムアーキテクトについて、特に統計処理に関連する処理を抜き出すと以下のような感じになります。

Input
  • ユーザーの操作履歴ログ
    • Redis Queueから取得(Streaming)
  • マスターデータ(ユーザーデータ、スタンプ情報など)
    • MySQLから(Batch)
Processing
Output
  • 各種統計結果
    • MySQLに保存(Batch)

LINEサービスは一般的なWebアプリケーションとはアーキテクトが異なるのでアクセスログ的なものは存在しません。なのでユーザーの操作履歴ログをRedis Queueを介して非同期で取得し、そちらを解析対象としています。
また解析処理は、定番の組み合わせですがHadoopとHiveを用いて行っています。

直面していた問題

ありがたい事にLINEサービスは多くの方に使用いただいています。ユーザーの増加に比して、バックエンドの解析基盤で処理を行わなければいけないデータのサイズも肥大化の一途をたどっています。
たとえばユーザーの操作履歴ログは、直近では1日あたり700GB弱のサイズになっていました。

これらログデータとマスターデータとを組み合わせて、日に数十種の項目の解析処理を実施しています。

このような状況のため、日々のデータ解析処理も日に日に時間がかかるようになり、たとえば1日に1回実施している大きめのdaily batch processは最大で11時間半ほど時間がかかるようになっていました。

そこで、直近の問題を解決するために、解析処理のチューニング…出来る限り処理を早く終わらせるようにする…を行なう必要があった訳です。

チューニングに先立って

さてチューニングだ!と意気込むのは良いのですが、実際の話でいうと私はチームに合流したばかりで、システムの状況…何が問題で、何が遅いのか…が正確には把握できていませんでした。そのため、とっかかりとしてまずは情報の可視化を行いました。

既存の解析基盤では、解析処理はほぼすべてPythonで書かれていて、PythonからHive Thrift Serverを介してHQLを投げることで解析処理を実施していました。もちろん処理のログなどは比較的しっかりと出力・保存されていたのですが、たとえば各解析処理の処理時間や、発生したエラーなどを確認するためにはサーバーにログインする必要がありました。
これらを手っ取り早く可視化する手段として、Hadoopと親和性の高いワークフローエンジンとして有名な「Azkaban」を採用し、そちらで解析処理のコントロールを行なうようにしました。

Azkabanについての詳細な解説についてはこの記事では割愛しますが、本来の目的であるワークフローの制御以外にも、個々の処理の実行時間がグラフで可視化されるため、どの処理が遅いのかを特定するのに便利です。
ですので、既存の解析処理をすべてAzkaban上でコントロールするように変更し、処理の内容やかかった処理時間を可視化し、こちらをとっかかりにチューニングポイントを探ることにしました。

チューニングポイント

処理速度上の問題点として見えてきたのは以下でした。

  • マスターデータ(MySQL)のHadoop・Hiveへのインポート処理が遅い
  • MapReduceタスクの稼働数がキャパシティを超えている
    • Hadoop上のデータ構造に問題
    • そもそものサーバーリソースの不足

まず、データのインポート処理ですが、マスターデータの内容をHiveに反映させるために定期的にデータの更新を行っていたのですが、その処理がsingle processで動作するようになっており、データの急増に伴い処理時間が増えていました。かつHadoop Clusterの台数を増やすなどの方式で解決しないスケールし辛い仕組みになっていたため、改善が急務となっていました。

MapRedeuce処理のキャパシティ不足は根本的にはサーバーリソース不足が9割以上の原因のため基本的にはサーバー増設しか策がありませんでしたが、Hadoop上に保存されているデータ構造にいくつか改善可能なポイントがあったためそちらにも手を加えました。

インポート処理の改善(Sqoop)

データインポートについては、SqoopというOSSを採用しました。
Sqoopは現在はApacheのTop-Level Projectとして開発が進んでいるOSSで、主にRDBとHadoop・Hive間で効率的にデータのインポート・エクスポートを行なうためのツールです。
基本的な仕組みについての図はClouderaさんのブログから引用させていただきます。

http://www.cloudera.com/blog/2012/01/apache-sqoop-highlights-of-sqoop-2/ より引用)

Sqoopは基本的にコマンドラインで動作するツールです。いくつかのオプションを指定することで、いい感じでRDBとHadoop・Hive間のインポート・エクスポート処理を行ってくれます。
(参考:Sqoopのimportコマンドのオプション一覧

Sqoopでインポート処理を実行すると、MapReduceのJob(正確にはMap Only)が起動し、Map Task中で設定内容に応じて元データを持つデータベースに接続しdumpを行い、その結果を直接HDFSに書きこむ、という動きになっています。

https://blogs.apache.org/sqoop/entry/apache_sqoop_overview より引用)

インポート処理の効率化の観点から言うと、以下の2点が高速化に寄与しています。

  • 並列的にインポート処理が行える
  • dumpしたデータを直接HDFSに書き込める

Hadoopを用いて比較的簡単に並列インポートが行える事も便利ですし、こういったインポート処理を自前のスクリプトで書くとまずローカルファイル上にdumpを保存しそれをHDFSにputするという何段階かの処理を経る事が多く、とくにHDFSへの大きいファイルのputはどうしても時間がかかる作業ですのでここで時間をロスします。そういう意味で、HadoopやHiveで使用するデータのインポートにはSqoopは比較的向いていると思います。

チューニング結果(Sqoop)

今回のSqoopの導入により処理が2時間ほど高速化されました。

データ構造の改善

続いてデータ構造の改善です。

LINEの統計解析基盤では、Hive Tableで扱うデータは基本的にはTextFile形式で保存されていました。もちろんTextのままの方が運用面では色々と便利な面もありますし以前はこれで性能的な問題は無かったのですが、これだけデータが大きくなるともう少し効率的なデータ形式に変更する必要がありました。また、扱うデータが大きいのでデータの圧縮についても気を遣う必要がありました。
他にも、データによっては1ファイルあたりのサイズが小さすぎる(HDFSの1ブロックサイズよりも極端に小さい)ものが存在しておりパフォーマンスに悪影響を与えていました。
そういったいくつかの点を地道に改善してみました。

TextFile→RCFileへのマイグレーション

まずはHiveの処理に向いていて、かつデータサイズを出来る限り小さくする事が可能なファイルフォーマットの選定です。色々調査していた結果、今回のケースではRCFileというデータ形式を採用するのが良いのではないか、という結論に達しました。
RCFileはHadoop(HDFS)上で、たとえばHiveのように構造的なデータを扱う際に適したファイルフォーマットです。詳しくは以下の論文を参照ください。
RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems (ICDE’11)

特に個人的に良いと思ったのは、データサイズを出来る限り小さくする工夫が随所に存在しているところです。メタデータのランレングス符号化もそうですし、同一のカラム単位でデータが保存されているので連続して同じような傾向のデータが出現する確率が高く、圧縮アルゴリズムによる圧縮が効きやすくなっています。
実際、他のデータ形式(TextFile、SequenceFile)と比較しても、RCFileがデータサイズ的には一番優秀でした。以下は、まったく同一のデータ群を、それぞれのファイル形式で無圧縮・圧縮状態で保存して、どの程度のサイズ差が出るかの比較です。

既存のデータの中にはTextFileでかつ無圧縮状態で保存されているものもありましたので、今回のタイミングでHive Tableのデータはすべて「RCFile+GZIP」の形式で保存するように変更しました。※実際はまだ一部マイグレーション中です。

細かすぎるファイルのマージ

HadoopのHDFS上では、ブロックサイズよりも極端に小さいファイルがたくさん存在するとあまり効率的に処理ができません。この辺は非常に有名な話で、Clouderaさんの「The Small Files Problem」というブログ記事がとても分かりやすく参考になります。

細かいファイルのマージは一般的にはHARを用いる事が多いようですが、今回はHiveのマージ関連のオプションを使用しました。

「hive.merge.mapfiles」「hive.merge.mapredfiles」で出力ファイルのマージ処理のon/off設定をします。サイズの調整は「hive.merge.size.per.task」「hive.merge.smallfiles.avgsize」にて行えます。こちらを組み合わせて、ブロックを有効に使えるようなサイズに調整してファイルを出力することが出来ます。

以下は若干机上の数値になりますが、実際マージ前後でどの程度の処理時間差が出るか、count文とselect文(where hoge=xxx)を発行してみた比較です。

チューニング結果(データ構造の変更)

今回測定したタイミングではすべてのデータのマイグレーションが終っていなかったので効果は中途半端でしたが、それでも1時間ほど処理速度が改善されたため、先に挙げた手法はチューニングの方向としては間違っていないと思っています。

サーバーの増設

さて、色々長々と書いてきましたが、チューニング作業には限界があり、最後にはサーバー増設(お金)に頼らなければいけないケースもあります。
今回についても本質的にはそういう状況だったため、既存のHadoop ClusterのDataNode/TaskTrackerの台数を倍に増やしました。

いっきに6時間も短縮・・・
まあ、なんというか、最終的にはお金が大事ですねという現実に直面した次第です。
(参考:現在の心境

実際の所、データ構造のチューニングによりポテンシャル的に処理は効率化されていましたが、それでもまだ存在した処理のキャパシティ不足がサーバー増設により解決され、何もしなかった時よりも処理速度の改善効果が大きかったのでは、と思っています。

まとめ

何はともあれ、様々な手段を駆使することで解析時間を9時間ほど短縮することができました。

最後に、今回のチューニング過程の中で感じた事をまとめ的に記載して、この記事を締めたいと思います。

以上、「カジュアル」な規模のデータクラスター上での解析処理チューニングの過程について書かせていただきました。
この記事の内容が皆様に何かしらのお役に立てれば幸いです。

Appendix.

統計解析基盤で現在扱っているデータはせいぜいPBクラスの「カジュアル」なサイズのデータですが、今後のデータサイズ増加を考えたときに現状より効率的なデータ構造を検討する必要があります。
また、現在はほぼDailyもしくはHourlyの解析が中心ですが、より素早くデータの解析を行い、リアルタイムに近い形で解析結果を表示する仕組みも必要になってきます。

データ構造については、現在個人的に関心を持っているのが「CIF(ColumnInputFormat)」と呼ばれるものです。データのローカリティに気を遣い同じデータスキーマを同一のDataNodeに割り当てるような工夫や、シリアライズやデータ圧縮などに工夫をして、処理の高速化を実現する手法とのことです。
まだ実際に試してみてはいないですが、論文ベースではデータの読み込み速度がRCFileより38倍速い、との話です。
Column-Oriented Storage Techniques for MapReduce(VLDB '11)
Sandeep's Research Notes: RCFile and CIF

また、実サービスでの例としては、Treasure Dataでは内部のデータ構造にMessagePackを用いて高い圧縮率を実現しているとの事です。
Five Criteria of Next Generation Data Warehouse | Treasure Data Blog

Check. We achieve a 5-10x compression ratio. Columnar data storage helps with compression considerably, but our secret sauce is a binary serializer called MessagePack. MessagePack is space-efficient and incredibly fast to serialize and deserialize. One of our co-founders is the original author of MessagePack, and we use it extensively throughout our stack.

こういった事例を参考にしながら、より効率的なデータ構造の採用を検討していきたいと思います。

データの準リアルタイム解析についてはいわゆるStreaming Processingの概念と、それを実現するミドルウェアが必要になってきます。
最も有名なものとしてはTwitter社が導入していると言われる「Storm」でしょうか。
また、ログコレクターとして有名な「Fluentd」についても、Aggregationを行なうプラグインなども存在しますし、工夫次第でStreaming Processingが可能になると思います。
こちらも色々と試してみたいと思います。

イケているサービスの裏側には、イケているデータ解析基盤が存在するのが通説(?)ですので、我々もできるだけその境地に近づけるよう努力していきたいと思います。