LINE LIVE의 PC 송출 기능을 위한 queue 구현

안녕하세요, LINE LIVE 개발을 담당하고 있는 Yappo입니다. 이번 블로그에서는 사용자의 PC에서 LINE LIVE 서비스를 송출하는 기능을 구현하기 위해 task 실행을 지연하는 queue를 만든 과정을 소개하겠습니다.

LINE LIVE의 송출 방식

기존에 LINE LIVE에서 제공하는 송출 방식은 LINE LIVE 앱에서 직접 송출하는 방식, LINE Official Account Manager와 RTMP 소프트웨어나 전용 장비를 사용하여 PC에서 송출하는 방식이 있었습니다. 다시 말해서, 사용자는 LINE LIVE 앱을 사용해야 방송을 할 수 있고 PC에서는 불가능했습니다. PC에서의 방송은 LINE OA 관리자만 가능했습니다.

아래 그림을 보면 두 송출 방식이 완전히 다른 구조로 구현되는 것처럼 보이지만 기본 설계는 동일합니다.


LINE Official Account Manager를 사용해서 PC에서 방송을 송출하려면 몇 단계를 거쳐야 합니다. 반면 LINE LIVE 앱은 시작 버튼을 누르는 순간 바로 방송을 시작합니다. 두 방식이 작업을 처리하는 순서가 약간 다르기는 하지만 주요 흐름은 아래와 같이 동일합니다.

  1. RTMP 전송을 시작한다.
  2. LINE LIVE에 방송 시작을 알린다.
  3. RTMP 전송을 중지한다.
  4. LINE LIVE에 방송 중지를 알린다.

일반 사용자를 위한 PC 송출

시장 상황을 보면 LINE Official Account가 없는 일반 사용자의 경우도 PC 송출에 대한 수요가 높다는 점이 확실했습니다. 이러한 사용자에게 즉시 가치를 제공하지 못하면 서비스의 기회손실이 발생할 것이기 때문에 일반 사용자를 위한 PC 송출 기능을 구현하기로 결정했습니다. PC 송출 기능을 빠른 시일 내에 제공하는 동시에 철저한 QA를 통해 품질을 보장해야 가장 큰 가치를 창출할 수 있다고 판단했습니다. 그래서 기존 아키텍처를 크게 바꾸지 않고 PC 송출 기능을 개발하는 방안을 찾게 되었습니다.

기존 아키텍처의 변경을 최소화하려면 클라이언트 쪽에 변경이 없어야 합니다. 클라이언트, 즉 LINE LIVE 앱을 변경하려면 사용자가 앱을 업데이트해야 합니다. 다행히 LINE LIVE는 작년에 웹 LINE 로그인 기능을 도입했기 때문에 웹 환경을 변경하면 될 것 같았습니다. 이미 웹 환경에서 송출이 가능한 LINE Official Account Manager가 있기는 했지만 이 기능은 B2B(business-to-business)를 대상으로 하는 서비스이기 때문에 일반 사용자와는 이용 사례가 달랐습니다. 또한 웹 환경에서는 송출 상태를 정기적으로 모니터링해야 하고 적지 않은 개발 비용이 필요하기 때문에 빠른 시일 내에 개발하기 어려울 거라고 판단했습니다. 그래서 웹에서는 PC 송출을 설정하고 관련 정보를 표시하는 간단한 기능만 구현했습니다.

아키텍처를 설계하는 데에도 많은 시간을 허비할 수 없었기 때문에 아래 그림과 같이 최대한 단순한 구조의 아키텍처로 구현하기로 했습니다.

보시다시피 LINE LIVE의 송출 흐름은 상당히 단순합니다. 즉, RTMP 송출을 감지한 시점에 방송을 시작하고 송출이 끊어지면 방송을 종료합니다. 기존의 LINE LIVE 앱에서 이러한 송출 흐름을 적용하지 않은 이유는 방송 타이틀 설정 등을 처리하기 위해 API와 통신해야 하기 때문입니다. (이런 이유로 PC 송출의 경우 방송 타이틀이 단순하게 설정됩니다.) 다시 말해서, 앱은 송출이 중단된 후에도 실행 상태를 유지해야 합니다.

PC 송출을 위한 queue 구현

송출 흐름을 구현하는 작업 자체는 몇 시간 만에 끝났지만, ‘RTMP 전송이 끊어지는 순간 방송이 종료’되어 버리는 문제가 있었습니다. 송출하는 쪽의 네트워크 환경이나 송출 장비의 부하 상황에 따라 RTMP 전송이 끊기는 경우가 종종 있기 때문에 최적의 서비스 환경을 보장할 수 없는 것입니다. 이러한 불편함을 해소하기 위해 RTMP 전송 중단을 감지하면 몇 초 동안 대기한 뒤에 방송을 종료하도록 만들었습니다. 단, 대기 시간이 너무 길면 정상적으로 송출을 중단한 후에도 시청자의 앱에서는 여전히 방송 상태인 것처럼 보이게 됩니다. 시청자가 혼란스러워할 수 있기 때문에 대기 시간은 상황을 보며 조절해야 합니다.

해결책을 찾긴 했지만 난감한 문제가 하나 남아 있었습니다. LINE LIVE에서 사용하는 JobQueue 시스템은 지정 시간(초)이 경과한 시점에 task를 실행하는 기능을 지원하지 않았습니다. 시중에 이런 기능을 제공하는 소프트웨어가 여럿 있지만, 새로운 소프트웨어를 검증하고 서버를 증설하는 동안 비즈니스 기회를 놓칠 것 같았습니다. 그래서 새로운 툴을 찾는 대신 우리 시스템에서 이미 검증된 사용 가능한 툴이 있을지 알아보았습니다.

그 결과 선택한 것이 Redis입니다. Redis는 queue 생성에 유용한 타입을 제공하기 때문입니다.

초기 구현

먼저 List 타입을 사용해서 간단한 queue를 만들었습니다. 코드의 예는 아래와 같습니다.

fun enqueue(id: Long, runUnixTime: Long) {
    redis.lpush(redisKey, mapOf("id" to id, "runAt" to runUnixTime ))
}

fun dequeue(): Long? {
    return redis.rpop(redisKey)?.get("id")
}

그런데 queue에 enqueue된 어떤 task가 dequeue되는 시점에 이 task의 실행 시간, 즉 runAt이 미래로 지정되어 있으면 실행 문제가 발생하기 때문에 다시 enqueue해서 queue에 넣어야 하는 번거로움이 있었습니다. Queue 처리가 복잡해지면 버그가 생길 가능성이 높아지고 나중에 장애의 원인이 되기 때문에 이 방식은 개선이 필요했습니다.

Sorted Sets를 사용한 구현

개선책은 Redis가 실행 시간이 도달한 task만 처리하도록 만드는 것이었습니다. Redis가 제공하는 타입을 잘 살펴본 결과 Sorted Sets를 사용하면 된다는 것을 알게 되었습니다. ZRANGEBYSCORE 명령어를 사용해서 score를 task 실행 시간으로, max를 현재 시각으로 하면 실행 시간에 도달한 task만 실행 시간 순으로 dequeue할 수 있습니다.

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}

fun dequeue(): Long? {
    return redis.zrangebyscore(redisKey, '-inf', now().toUnixTime(), 'LIMIT', 0, 1)
}

한 가지 주의할 점은 ZRANGEBYSCORE는 RPOP과 달리 Redis 스토리지에 쌓인 task를 삭제하지 않기 때문에 별도의 삭제 처리가 필요합니다.

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}

fun dequeue(): Long? {
    val id = redis.zrangebyscore(redisKey, '-inf', now().toUnixTime(), 'LIMIT', 0, 1)
    redis.zrem(redisKey, id)
    return id
}

결과적으로 Redis에 쿼리를 2회 전송하게 되었습니다. 한 번은 dequeue할 task의 ID를 받기 위해, 또 한 번은 Redis에서 그 task ID를 삭제하기 위해 쿼리를 전송하는 것이죠. 만약 한 task를 여러 번 동시에 dequeue하려고 시도하는 버그가 생기면 어떻게 될까요? 동일한 ID가 여러 번 동시에 반환되어 동일한 task가 여러 번 동시에 dequeue 처리되는 상황이 될 것입니다. 이러한 오류를 방지하기 위해 dequeuing 처리 과정을 atomic하게 만들고 싶었습니다. 그래서 Redis에서 Lua script를 써서 한 번의 쿼리로 여러 개의 Redis 명령어를 실행하도록 코드를 변경했습니다.

// initialize
val lua = "local queue = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 1)[1]\n" +
          "if (queue) then\n" +
          "  redis.call('ZREM', KEYS[1], queue)\n" +
          "end\n" +
          "return queue\n";
val sha = redis.scriptLoad(lua)

fun enqueue(id: Long, runUnixTime: Long) {
    redis.zadd(redisKey, runUnixTime, id)
}

fun dequeue(): Long? {
    return redis.evalsha(sha, 1, redisKey, now().toUnixTime())
}

이로써 task를 dequeue하는 동시에 Redis에서 해당 task를 삭제하는 queue를 구현했습니다.

구현한 queue 실행

Queue의 구조를 단순하게 설계해서 개발 기간을 줄였는데 새로운 소프트웨어를 도입하는데 시간을 들이면 앞선 노력이 무의미해질 것입니다. 그래서 전통적인 UNIX 방식에 따라 실행 방법도 단순화했습니다.

아래와 같이 방송 종료 task를 dequeue하는 API를 만들고 crontab을 이용해서 지정 시간(초)마다 curl 명령어로 이 API를 호출하도록 했습니다.

for (id in dequeue()) {
    broadcastingManager.stop(id)
}

마치며

지금까지 LINE LIVE 사용자에게 PC에서 방송을 송출하는 기능을 제공하기 위해 queue를 구현한 과정을 소개했습니다. 앞으로도 LINE LIVE 송출 시스템의 개선점을 모색해서 더 좋은 사용자 경험을 제공할 수 있도록 노력하겠습니다.

Related Post