As of October 1, 2023, LINE has been rebranded as LY Corporation. Visit the new blog of LY Corporation here: LY Corporation Tech Blog

Blog


Implementing a queue for LINE LIVE PC transmission

Greetings, this is Yappo, in charge of LINE LIVE development. On this blog post, I will introduce a queue for delaying tasks, created for transmitting LINE LIVE service from users' PC.

How LINE LIVE was broadcasting

We used to have two ways to broadcast LINE LIVE. One is to broadcast directly from the LINE LIVE app and the other is to broadcast from computers. Broadcasting from computers required using the LINE Official Account Manager site and RTMP software (or an exclusive tool). In other words, users were only able to broadcast using the LINE LIVE app, and not PC; only the LINE OA managers have been able to use computers to broadcast.

When you look at the following two sequence diagrams, you would probably think that the implementations for the two transmission types would be completely different. Much to your surprise, the fundamental designs are the same.

Broadcasting with the LINE Official Account Manager on PC consists of several steps. Contrarily, broadcasting from the LINE LIVE app starts right at the point of tapping the start button. Although there are differences in the process sequence, the mainstream of the process is the same as below:

  1. Begin RTMP transmission.
  2. Announce the start of the transmission to LINE LIVE.
  3. Terminate RTMP transmission.
  4. Announce the transmission termination to LINE LIVE.

Transmission for general users

The market was clearly telling us that there was a high demand for PC transmission for general LINE LIVE users who do not have LINE Official Account. If we do not server our users in time, we would definitely loose a great opportunity. To reduce time to market, we ended up with a decision to enable our general LINE LIVE users to broadcast from their PCs.

To provide our users a valuable service, we had to be quick to provide PC trasmission. At the same time, we had to guarantee the service quality with a thorough QA. Thus, we sought a way to develop a technique to transmit via PC without making too much modification on the existing architecture.

Implementing PC transmission with the minimum architectural changes meant avoiding making changes on the client side. Changing the client side—the LINE LIVE app—would require our users to update their apps. Thankfully, LINE LIVE has adopted Web LINE login last year and changing the web side seemed a feasible solution. Using the LINE Official Account Manager for transmitting through Web was appealing. However, the LINE Official Account Manager mainly targets B2B customers, so the use cases of this solution were different to that of general LINE LIVE users. For this reason, we implemented only the transmission configuration and viewing information on the web.

We had not enough time on our hands, preventing us from sparing too much time on the architecture. Thus came out a simply structured architecture, as shown below.

As you can see, the flow of LINE LIVE transmission is actually simple; start broadcasting when transmission through RTMP connection is detected and terminate broadcasting when the transmission on the RTMP connection is no longer detected. The LINE LIVE app does not opt such mechanism because the app needs to handle API calls such as setting the program title. (This explains why program titles are set simply when transmitting via PC). In other words, the app has to remain running even when the tranmission has halted.

Implementing a queue for PC trasmission

The implementation had completed within a few hours, but not without problem; a broadcast terminated the moment transmission had terminated. Transmission can be halted simply by the poor quality of the user's network environment or the transmitter's load. Having various factors that could result in unwanted broadcast termination, we had to find a way to guarantee the best service quality. So what we did was, delayed broadcast termination for a couple of seconds, after detecting a halt in transmission on an RTMP connection. If the delay is too long, viewers will be misled and unable to recognize that what they have been watching has been cut off. To prevent viewer's confusion, we need to choose the right duration for dealying, taking an account of many variables.

Unfortunately, adding the delay wouldn't solve everything as another issue came up. The JobQueue system on LINE LIVE did not support designating a delay for a task to be executed. We could have opted using one of the software in the market that could resolve this issue, but we were likley to loose business opportunities due to the timeframe. So, instead of seeking a new tool, we started considering the tools that have already been approved for use on our system.

And the solution turned out to be Redis. Redis was chosen as it provides data types useful for creating queues.

At first

We made a simple queue using the List type. The following is an enqueuing and dequeuing.

fun enqueue(id: Long, runUnixTime: Long) {
    redis.lpush(redisKey, mapOf("id" to id, "runAt" to runUnixTime ))
}

fun dequeue(): Long? {
    return redis.rpop(redisKey)?.get("id")
}

The code seems straightforward, but the operation was not. If the given time (that is, "runAt") to execute the associated task was in the future at the time of dequeing the task, the task would not run properly. Thus the dequeued task had to be enqueued again into the list, which can be quite bothersome. If the code for queue handling becomes complex, there is a high possibility of bugs and eventually will cause of outage. How could we make the operation as simple as the code seems? Sorted sets came into the picture.

Implementing with Sorted Sets

Make Redis handle only the task that has reached its execution time. This is what we wanted to accomplish. Based on the research on the Redis data types, we realized Redis Sorted Sets was the answer for us. We used the ZRANGEBYSCORE command with the score as the task's execution time and the max as the current time. This enables dequeuing only the tasks that has reached their execution time, in the order of execution time.

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}

fun dequeue(): Long? {
    return redis.zrangebyscore(redisKey, '-inf', now().toUnixTime(), 'LIMIT', 0, 1)
}

Yet, you must be aware that the ZRANGEBYSCORE command, unlike the RPOP command, does not delete tasks from Redis storage. Therefore you need to manually delete executed tasks.

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}

fun dequeue(): Long? {
    val id = redis.zrangebyscore(redisKey, '-inf', now().toUnixTime(), 'LIMIT', 0, 1)
    redis.zrem(redisKey, id)
    return id
}

As the result, two queries are made to Redis; one is for getting the ID of the task to dequeue and the other is for deleting the task ID from Redis. Imagine a bug attempting to dequeue a task multiple times simultaneously. In such case, the same ID would be returned multiple times simulatenously, resulting in dequeuing the same task multiple times simultaneously. To prevent such error, we wanted to make the dequeuing process atomic; we changed the code to run multiple Redis commands with a single query by using Lua script on Redis.

// initialize
val lua = "local queue = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 1)[1]
" +
          "if (queue) then
" +
          "  redis.call('ZREM', KEYS[1], queue)
" +
          "end
" +
          "return queue
";
val sha = redis.scriptLoad(lua)

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}

fun dequeue(): Long? {
    return redis.evalsha(sha, 1, redisKey, now().toUnixTime())
}

In this way, we have a queue that dequeues tasks and deletes the tasks from Redis at the same time.

Executing the implemented queue

By making the queue implementation simple, we have shortened the development period. Had we invested our time on adopting another software for executing the queue, our previous efforts could have gone in vain. This is the reason why we also have made the execution method simple as well, by following the traditional UNIX way.

As shown below, we have made an API that dequeues a task to terminate a broadcast. Through crontab, we have also made to call this API with curl commands at every designated time (second).

for (id in dequeue()) {
    broadcastingManager.stop(id)
}

As I close

I have introduced a quick implementation of a queue to serve broadcasting from PC for our LINE LIVE users. For better user experience, we will continually seek what we can do to improve LINE LIVE transmission system.