LINE Engineering
Blog

RedisのSorted Setsで簡易的な遅延実行Queueを作って迅速にLINE LIVEのPC配信対応をリリースした話

Yappo 2017.06.22

みなさんこんにちは、LINE LIVE開発のYappoです。
今回は先日リリースされました一般向けのPC配信機能を実装するときに作った簡易的な遅延実行Queueについて書いていこうと思います。
関連エントリ:LIVE PRESS 公式ブログ - ゲーム実況にもぴったり!LINE LIVEでPC横型ライブ配信を試してみよう

背景

今までのLINE LIVEでの配信方法としては、アプリ上で直接配信する方法と、公式アカウント向けの専用画面(LINE OFFICIAL ACCOUNT MANAGER)とRTMPソフト(もしくは専用機材)を利用してPCからの配信する方法がありました。
この2つの方法は全く違う仕組みで実装されるように見えますが、実は基本となる設計は同じです。

上に2つのパターンのシーケンス図を掲載しましたが、出て来る要素はそれぞれ若干異なりますが

  1. RTMP送信開始をする
  2. LINE LIVE側に配信開始を伝える
  3. RTMP送信停止をする
  4. LINE LIVE側に配信停止を伝える

    という流れは同じだとわかります。そうです、LINE LIVE AppではLINE OFFICIAL ACCOUNT MANAGERを利用した各種作業手順をアプリのボタンを押しただけで全自動で実行してくれるようになっています。

一般向けの配信機能では何が必要なのか

一般向けのPC配信機能を提供するという事が決定したのはリリースの前週でした。PCでの配信を行うことの需要が高いのは世の中を見ていれば自明なため、即座に世の中に価値を提供できなければサービスとしての機会損失が日に日に増大して行くことは明らかなのです。なので、確実にQAを行い品質を担保しつつも、素早くPCでの配信できる機能をリリースする事が世の中に対して一番の価値を提供できると考え、既存のアーキテクチャを大きく変更する事なくPCでの配信をリリースする方法を選択する必要がありました。

本題に戻りますが、既存のアーキテクチャの変更を最小限に留めて実装する事を考えると、アプリのバージョンアップが必要な開発は行えません。幸いLINE LIVEでは昨年にWebからのLINE Loginに対応していたのでWeb上に変更を入れれば良さそうです。Web上での配信する機能としてはLINE OFFICIAL ACCOUNT MANAGERがすでに存在していますので、それを流用すれば良いように思えますが、主にBtoB向けの画面であり一般的なユースケースと違うため流用は無理でした。またWeb上で定期的に配信の状態監視を行う必要があり、少なくない開発コストが必要となるため、今回の迅速なリリースの目標は達成できないと判断しました。

Web側の実装は最小限に、以下のようにメニューの追加とPCで配信するための情報を提供するだけのシンプルなものにしました。

実装をする

こちらの構成検討にも時間は消費できないので、一番シンプルな構成として以下の図のようなアーキテクチャで実装することにしました。
かなり単純になりました。純粋にRTMPの送出を検知した段階で配信開始処理を行い、切断時に配信終了処理を行います。
既存の配信フローもこれで良いと思われるかもしれませんが、配信タイトルの設定などの処理が必要なためAPIとの通信は必要なのです。(なのでPC配信では配信タイトルは簡易的なものが設定されます)

こちらの実装自体は数時間で完了したものの、一つの問題として「RTMPが送信されなくなった瞬間に配信が終わってしまう」という使いづらいものになっていました。配信側の回線環境や配信マシンの負荷状況によってはRTMP送信が途切れる事が多々あるので、使いづらい配信サービスとなってしまいます。

Queue実装の検討

結論としては、RTMPの切断を検知した後、一定の秒数を待った後に配信終了処理を行います。この一定の秒数が長すぎると、通常の配信切断を行ったつもりなのにアプリ上では配信中として残り続けて混乱が生じるため、この時間は状況を見て調整していこうと思っています。

方針が決まったのは良いのですが、困ったことにLINE LIVEで利用している既存のJobQueueシステムでは指定秒数後にJobを実行する仕組みが存在していないので、これを解決する必要があります。
世の中にはこれを実現するソフトウェアがいくつも存在しているのですが「今から検証してサーバを追加して導入していたらどんどん機会が損失してしまう!!!」と考え、身の回りに存在する道具で迅速に作れないかを考えました。

そうです、ようやく本題のRedisの話になります。
LINE LIVEでは元々Redisを利用していました。RedisにはQueueを作るために役立つ型も存在しているので、Redisを利用することにします。ただし単純明快な実装にしないと、実装速度もですが他の開発メンバーの負担が大きくなってしまうので、そういった部分に気を使う必要がありました。

最初の実装

まずは単純にLists型を用いて単純なQueueを作成しました。疑似コードとしては以下のような感じです。
fun enqueue(id: Long, runUnixTime: Long) {
    redis.lpush(redisKey, mapOf("id" to id, "runAt" to runUnixTime ))
}
 
fun dequeue(): Long? {
    return redis.rpop(redisKey)?.get("id")
}

単純明快で良さそうですが、dequeueしたqueueのrunAtが未来の時間を指定していたら実行させられないので、再度enqueueでqueueに戻す必要があり煩雑になってしまいました。 このような手順が煩雑になると、いつかバグが入り込んで障害の原因になるため避けなければなりません。

Sorted Setsを使う

まず実現したいこととしては、実行時間に達したqueueだけをRedisから取得することです。ここでRedisで提供している型をよく見てみると、みんな大好きSorted Setsのscoreに実行時間を入れてZRANGEBYSCOREで現在時刻を指定して取得すれば、dequeueすべきqueueだけを実行時間順に取得できるということに気がつきました。
fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}
 
fun dequeue(): Long? {
    return redis.zrangebyscore(redisKey, '-inf', now().toUnixTime(), 'LIMIT', 0, 1)
}

これで完璧!と一瞬思ってしまいますがZRANGEBYSCOREはRPOPと違ってRedisからの要素の削除を行いませんので、削除をする必要があります。

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}
 
fun dequeue(): Long? {
    val id = redis.zrangebyscore(redisKey, '-inf', now().toUnixTime(), 'LIMIT', 0, 1)
    redis.zrem(redisKey, id)
    return id
}

うーん、これだとRedisに対して2回もQueryが飛んでしまいますし、もしdequeueする処理が同時に複数回実行されてしまうバグなどが混入した時には、同じidを複数回dequeueしてしまうリスクがないとは言い切れません。。。どうにかAtomicにしたいですね。。。
RedisにはLua Scriptingという機能が用意されており、これを使うと1回のQueryで複数個のRedisコマンドをAtomincに実行できるようになります。今回はこれを利用すると目的が簡単に実現できるので、Lua Scriptingを用いてdequeue処理を実装するようにしました。

// initialize
val lua = "local queue = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 1)[1]\n" +
          "if (queue) then\n" +
          "  redis.call('ZREM', KEYS[1], queue)\n" +
          "end\n" +
          "return queue\n";
val sha = redis.scriptLoad(lua)
 
fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}
 
fun dequeue(): Long? {
    return redis.evalsha(sha, 1, redisKey, now().toUnixTime())
}

これでdequeueしつつRedis上のqueueも同時に削除する簡易的なqueueが実装できました。

実行方法は?

Queueの仕組みを簡単にして開発期間を短縮したのはいいが、それの実行方法のために新しいソフトウェアを導入していては意味がないので、古き良きUNIX wayに乗っかり実行方法も単純にしました。

具体的にはInternalな領域に以下のようなdequeueして配信停止処理を呼び出す実装を書いたAPIを用意しておきcrontabで指定秒数おきにcurlコマンドでAPIを呼び出すという単純な方式を取ってます。
for (id in dequeue()) {
    broadcastingManager.stop(id)
}

終わりに

今回はユーザに素早く価値を提供しつつ、より良い実装を模索して実現する過程について紹介しました。 今回のPC配信機能も改善点が数多く存在しているので、サービスを提供しながら徐々に良い体験を提供できるようにしていきます。

LINE LIVE Redis

Yappo 2017.06.22

Add this entry to Hatena bookmark

リストへ戻る