HBase Cross Row、Cross Tableトランザクション処理機能を実装してみました

こんにちは、LINEでGame Platformを開発している 趙 です。

この記事はLINE Advent Calendar 2016の9日目の記事です。

LINE Game Platformでは分散型でスケーラブルな高速データベースであるHBaseをメインストレージの一つとして使っています。HBase運用における問題のひとつは、cross row, cross tableトランザクション処理機能がない事です。Client Faultなどが起こった時の対応が難しいです。(例えば、HBaseにテーブルAとBがあり、ABの順番にデータを処理する場合、もしAの挿入の後にBの挿入に失敗した場合、データ不整合が起きます)

HBaseは幾つかの単行atomic apiを提供します。(HBase versionによって違うところがあります)

トランザクション処理機能はもともとHBaseでは強く求められませんが、運用中はできるだけデータベースの種類を減らしたい気持ちがありますから、HBaseでcross-row、cross-tableトランザクション処理機能をサポートする必要性がありました。すでにHBaseにおけるトランザクション処理のOSSが幾つか存在していますが、トランザクション処理要件、対応HBaseバージョン、導入の難易さ、社内ツールとの連携性などを検討して内製することになりました。設計中は主にPercolatorhaeinsaを参考にしました。

スペック

トランザクション処理は要件によって設計が異なります。私達の導入予定案件では、トランザクション規模は小さくて、WriteよりReadの方が明らかに多いです。そのため、スペックは以下のように定義しました。

インテグレーション方式は、導入と移行の便利さを考えてクライアントライブラリに決めました。使用者はプロジェクトにこのライブラリを導入するだけで済みます。HBase側の対応は必要ありません。

使い方は以下のような感じです。

 void exampleCode() throws TransactionConflictException {
  Transaction t = TransactionManager.begin();
 
  // generate normal HBase 'get'
  Get get = new Get();
  get.addColumn(...);
  // do 'get' in transaction t
  Map result = t.get("tablename", get);
 
  // generate normal HBase 'put'
  Put put = new Put();
  put.addColumn(...);
  // do 'put' in transaction t
  t.put("tablename", put);
 
  t.commit();
}

分離レベル

アプリケーション側でトランザクション処理をする場合に、性能に一番影響するのはHBaseにアクセスするときの通信コストです。高い分離レベルは優れた並列性を提供できますが、通信回数を増やせば性能は逆に落ちることがあります。

例えば、HBase cell versionを利用してMVCC(Multi Version Concurrency Control)で分離レベルSnapshot Isolationを実装してみると、データの取得は二段階になります。 まずは有効なデータバージョンを取得し、そして有効バージョンでデータを取得します。分離レベルSerializableの場合はロック情報とデータは同じ行に保存すると、アクセス時に一緒に取得できます。Snapshot IsolationよりHBaseアクセス回数が少ないです。そしてSnapshot Isolationの場合は、cellごとに有効バージョンを保存するカラムが必要です。テーブルのカラム数が倍になってしまうと(この場合の分離単位はcellです)ストレージとネットワーク通信に対して大きなデメリットがあります。分離レベルSerializableの場合はテーブルにロック情報を保存するカラムを一つだけ追加すればいいです。

Concurrency Control

よりよい並列処理能力を提供するため、OCC(Optimistic Concurrency Control/楽観的並行性制御)を採用します。トランザクション処理の一時結果はコミット段階までHBaseに書かずメモリに保存します。ロックもコミット段階までかけないようにします。

例えば、同じresourceを使うトランザクション処理t1t2があります。

t2(read only)がt1より開始時間が遅くなりますが、t2はt1コミット開始前に完了すると、両方とも成功です。PCC(Pessimistic Concurrency Control/悲観的並行性制御)の場合は、t2は失敗です。

t2がt1より開始時間が遅くなります。t2はt1コミット前にコミット完了した場合、t2は成功ですがt1は失敗です。PCCの場合はt2は失敗です。

コミット

コミット段階は実際にHBaseデータを入れる段階です。Two-phase commitプロトコールを使用してトランザクション処理の原子性を保証します。トランザクション処理を4つの状態に分けます。

  • BEGIN: 最初の状態です。user logicを処理して、結果をメモリに保存します。
  • PREWRITE: コミットフェーズ1。ロックをかけます。処理結果をHBaseに保存します。
  • COMMITTED: コミットフェーズ2。PREWRITEでかけたロックを解除します。
  • ROLLBACK: PREWRITEは失敗したら、トランザクションロールバックを行います。

状態を保存するため、HBaseに二つデータモデルを追加します。

  • Row status column family

アプリケーションで使うすべてのテーブルにこのColumn Familyを追加します。CAS APIのコンペア機能は一つのcellをチェックしますから、isLocked:tid:ts の形で一つのcellに保存します。

ロックを長くかけないように、他のプロセスがロックされた行にアクセス時、もしロック時間(tsからの時間)が長いならロックを解除することができます。

  • Transaction status table

トランザクション状態を保存します。アプリケーションごとに一つだけが必要です。

フェーズ1

トランザクション状態はPREWRITEになっています。CAS APIを利用して、変更した行ごとにデータの更新と同時にロックをかけます。もしCAS操作が失敗した場合は、Conflict(更新するデータはすでにロックがかけされます)が起きてると見なされるのでロールバックを行います。

データ更新の擬似コード

public void lockAndUpdateRow(String table, String row, String tid, String ts, String oldStatus, Map data) {
  String lockStatus = "{lock:tid:ts}";
  Put put = new Put(row);
  put.add(COLUMNFAMILY_STATUS, QUALIFIER_STATUS, lockStatus); // put row lock status
  put.add(..., tid) // put data with version
  if(!checkAndPut(table, row, COLUMNFAMILY_STATUS, QUALIFIER_STATUS, oldStatus, put))
    throw new RowConflictException();
}

全ての行が更新されたら、トランザクション状態をCOMMITTEDに変更します。

トランザクション状態変更の擬似コード

public void changeTransactionStatusToCommitted(String tid) {
  Put put = new Put(tid);
  put.add(COLUMNFAMILY_STATUS, QUALIFIER_STATUS, "COMMITTED"); // put transaction status
  if(!checkAndPut(TRANSACTION_TABLE, newTid, COLUMNFAMILY_STATUS, QUALIFIER_STATUS, "PREWRITE", put))
    throw new TransactionConflictException();
}
フェーズ2

トランザクション状態はCOMMITTEDになっています。実行プロセスはすべてのrow lockを持っていますのでConflictが起きません。ロックはほかのプロセスから解除することが可能です。二度とロックを解除しないようにCAS APIを利用してロックされた行ごとにロックを解除します。

擬似コードはこうなります。

public void unlockRow(String table, String row, String tid, String lockStatus, long ts) {
  String unlockStatus = "{not_lock:tid:ts}";
  Put put = new Put(row);
  put.add(COLUMNFAMILY_STATUS, QUALIFIER_STATUS, unlockStatus); // put new row status
  checkAndPut(table, row, COLUMNFAMILY_STATUS, QUALIFIER_STATUS, lockStatus, put);
}

すべてのロックを解除したらトランザクション処理が終わります。HBaseに保存したトランザクション状態をFINISHEDなどに変更する必要性はありません。

ロールバック

コミット中Conflictなどが発生したらすでに更新したデータの取り消しとロックの解除をしなければなりません。HBaseのCell Versionを利用してロールバック機能を実装しました。データをHBaseに入れるときにトランザクションごとでバージョンを付けます。ロールバック中は指定バージョンのデータを削除します。

ロールバック中でもClient Faultなどが起きるかもしれないため、複数のプロセスが同時にロールバックできるように設計しました。それでもロールバックをしようとする時に、他のプロセスがすでにロールバックをしたこともあります。ロールバック時点で行とトランザクションの状態確認が必要です。

  • プロセスは自分が処理しているトランザクションのロールバックをするときに、トランザクション状態をROLLBACKに変更してみて、成功したらすでに更新した行のロールバックをします。
  • トランザクション状態をPREWRITEからROLLBACKに変更するときの結果によって行為も違います。
  • 行状態の変更が成功したら、ロックの解除とデータの削除を行います
  • 行状態の変更が失敗したら、行のトランザクション状態を確認します

トランザクション状態変更の擬似コード

public ROLLBACK_TYPE changeTransactionStatusToRollback(String tid) {
  Put put = new Put(tid);
  put.add(COLUMNFAMILY_STATUS, QUALIFIER_STATUS, "ROLLBACK"); // put transaction status
  if(!checkAndPut(TRANSACTION_TABLE, tid, COLUMNFAMILY_STATUS, QUALIFIER_STATUS, "PREWRITE", put)) {
    if({nowStatus} == ROLLBACK) {
      // maybe other process is doing rollback, just do rollback for the needed row
      return ROLLBACK_TYPE.ROW;
    }
    if({nowStatus} == COMMITTED) {
      // transaction was succeeded, just unlock the row
      return ROLLBACK_TYPE.ROW_UNLOCK;
    }
  }
  return ROLLBACK_TYPE.ALL;
}

行のロールバックの擬似コード

public void rollbackRow(String table, String row, String newStatus, String lastStatus, String version, List cells) {
  RowMutations mutations = new RowMutations(row);
  // delete cells' value by version
  mutations.add(new Delete(..., version));
  Put put = new Put(row);
  // revert to last status
  put.add(COLUMNFAMILY_STATUS, QUALIFIER_STATUS, lastStatus);
  mutations.add(put);
  checkAndMutate(table, row, COLUMNFAMILY_STATUS, QUALIFIER_STATUS, newStatus, put);
}

newStatus、lastStatusともトランザクション状態テーブルに行ごとに保存しています。

トランザクション id

トランザクション idについての要件は

  • 一意に識別するための識別子です
  • 更新したcellのバージョンとして使います
  • 単調増加 (データを取る時にいつも最新のデータをとります)
  • トランザクションid生成スピードはトランザクションの生成スピードを制約します
  • トランザクションid生成システムが障害になったら、トランザクション処理はできないので場合によってはデータ不整合も起きます

要件からみると、トランザクションid生成システムは分散化しなければなりません。実装の方法はたくさんありますが、今はHBaseに一つトランザクションid管理テーブルを追加して、incrementColumnValue APIでidを生成しています。トランザクションidについてもう一つ注意したところはトランザクション状態テーブルのRowKeyです。Hot Spotを避けるため、トランザクションidを反転させた文字列をRowKeyとして使っています。

トランザクション処理をいくつのグループに分けます。同じテーブルを扱うトランザクション処理を同じグループに入れます。

データ一致性

トランザクション処理が失敗した時のデータ一致性を見てみましょう。

コミット前

全ての更新はアプリケーションサーバのメモリにありますが、まだHBaseに反応されません。もし障害があったら、処理中のトランザクションは失敗になりますが、HBaseに保存するデータに影響はありません。

コミットフェーズ1

一部のデータがロックされて、最新バージョンのデータも更新されてしまいます。もし障害があった場合、処理中のトランザクションは失敗します。他のプロセスはロックされた行にアクセスするときに、ロックがタイムアウトかどうかによってロールバックができます。

コミットフェーズ2

トランザクション状態変更は成功し、ロックを解除する時に障害が発生する場合です。処理中のトランザクションが成功しても、幾つかロックされた行が残ります。他のプロセスはこれらの行にアクセスするときに、tidからトランザクション状態を取って判断して、ロックを解除することができます。

最適化とパフォーマンス

トランザクションコミットを行うときに、トランザクションで扱うデータの行数とアクセスタイプに応じてコミット処理を行います。できるだけHBaseアクセス回数を減らします。

Single Row Read Only

行状態とデータは1つのget操作で一緒に取れます。get以外のHBase操作は不要です。行状態は1つcellに保存していますのでパフォーマンスに大きな影響はありません。

Single Row Write Only

まず行状態をとります。データを更新するときにCAS APIを利用して、状態が変更されなければデータを上書きします。ロックをかける必要性はありませんが、データを変更するため、行状態の更新が必要です。新しいtidを発行すれば解決はできますが、tidの生成はHBaseと通信が必要です。実はSingle RowのコミットにConflictの発生が可能ですがロールバックは発生しません、データバージョン(tid)の変更は不要です。実装中は行状態におけるtsを更新して、suffixも入れます。

Single Row Read And Write

Single Row Write Onlyとほぼ同じです。違うところは行状態を取るときにデータも一緒に取ります。

Multi Row Read Only

複数行を読む時の一致性を確保するため、最後の行を読んだらすでに取ってきた行の状態をもう一度確認しなければなりません。もし読む行数はnなら、n-1回の確認が必要です。Read Onlyトランザクションのロックはかけません。

mget

HBase通信回数を減らすため、実装中はmget apiを使って複数行のデータを取っています。例えば、行状態を確認する時はターゲット行をいくつのチャンクに分けてチャンクごとにmgetで状態を取得します。

Multi Row Write Only

ロールバックが発生可能です、データバージョン(tid)の生成が必要です。そして更新する行にロックをかけます。行数はmの場合、追加のHBaseアクセスは

  1. incrementColumnValue api, tidを生成します
  2. 行状態を取得するためm回のgetが必要です
  3. データ更新のm回のputをm回のcheckAndPutに変更します
  4. トランザクション状態を変更するため、2回のHBaseアクセスが必要です
Multi Row Read And Write

Multi Row Write Onlyとほぼ同じです。違うところはRead Onlyの行に対して状態の確認が必要です。

以上の分析から見ると、トランザクション処理の適用で大きな影響を受けるのは複数行を同時に更新する時です。このシステムはread性能重視を前提として設計しましたが、今の段階では問題なさそうです。

まとめ

この記事では、分離レベルSerializableのHBase cross-row、cross-tableトランザクション処理機能の設計実装周りを紹介しました。このシステムについては、現在、複数のシーンを想定してテストとチューニングを行っています。今後は、より良い性能と高い分離レベルを求めるため、トランザクション処理機能をHBase Coprocessorで実装するつもりです。

明日はやまぐちさんによる「CMake を使ったクロスプラットフォーム開発環境」についての記事です。お楽しみに!

Related Post