Armeria로 Reactive Streams와 놀자! – 1

Reactive Streams란?

LINE+에서 오픈소스 Armeria와 Central Dogma를 개발하고 있는 엄익훈입니다. 저는 Reactive Streams의 개념과, Reactive Streams를 오픈 소스 비동기 HTTP/2, RPC, REST 클라이언트/서버 라이브러리인 Armeria에서 사용하는 방법에 대해 공유하려고 하는데요. 이번 포스팅에선 먼저 Reactive Streams의 개념을 알아보겠습니다.

Reactive Streams는 공식 홈페이지인 reactive-streams.org에서 아래와 같이 정의하고 있습니다.

Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking back pressure.

홈페이지에선 논블로킹(Non-blocking) 백 프레셔(back pressure)를 이용한 비동기 데이터 처리의 표준이라고 말하고 있는데요. ‘스트리밍 처리’, ‘비동기(asynchronous) 방식’, ‘백 프레셔’, 그리고 ‘표준’이라는 각각의 단어가 의미하는 바를 조금 더 자세히 알아보겠습니다.

 

스트리밍 처리

아래 그림은 전통적인 데이터 처리 방식과 스트리밍 처리 방식을 비교한 그림입니다.

왼쪽의 전통적인 데이터 처리 방식에서는, 데이터 처리 요청이 오면 페이로드(payload)를 모두 애플리케이션의 메모리에 저장한 후에 다음 처리를 해야 합니다. 추가로 필요한 데이터도 저장소에서 조회하여 메모리에 적재해야 합니다. 이 방식의 문제점은 전달된 데이터는 물론 저장소에서 조회한 데이터까지 모든 데이터가 애플리케이션의 메모리에 적재되어야만 응답 메시지를 만들 수 있다는 것입니다. 만약 필요한 데이터의 크기가 메모리 용량보다 크다면 ‘out of memory’ 에러가 발생하게 됩니다. 그리고 서비스를 운영하다 보면 꼭 하나의 요청이 ‘out of memory’를 발생시키지 않더라도, 순간적으로 많은 요청이 몰리면서 다량의 GC(Garbage Collection)가 발생, 서버가 정상적으로 응답하지 못하는 경우가 종종 나타납니다.

그런데 많은 양의 데이터를 처리하는 애플리케이션에 스트림 처리 방식을 적용하면, 크기가 작은 시스템 메모리로도 많은 양의 데이터를 처리할 수 있습니다. 입력 데이터에 대한 파이프 라인을 만들어 데이터가 들어오는 대로 물 흐르듯이 구독(subscribe)하고, 처리한 뒤, 발행(publish)까지 한 번에 연결하여 처리할 수 있습니다. 이렇게 하면 서버는 많은 양의 데이터도 탄력적으로 처리할 수 있습니다.

 

비동기 방식

비동기 방식은 동기(synchronous) 방식과 비교하며 살펴보겠습니다. 아래 그림은 동기 방식과 비동기 방식의 처리 과정을 나타낸 그림입니다.

동기 방식에선 클라이언트가 서버에 요청을 보내면 응답을 받기 전까지 블로킹(blocking)됩니다. 블로킹된다는 것은 현재 스레드(thread)가 다른 일을 하지 못하고 기다린다는 것을 의미합니다. 따라서 두 개의 요청을 A와 B 서버로 보내면, A의 응답이 끝나고 나서야 B로 요청을 보낼 수 있습니다. 하지만 비동기 방식에서는 현재 스레드가 블로킹되지 않기 때문에 다른 일을 계속할 수 있습니다. A에게 요청을 보낸 뒤 다른 일을 처리할 수도 있고, 혹은 B에게 또 다른 요청을 보낼 수도 있습니다. 동기 방식과 비교하여 비동기 방식의 장점을 정리하면 아래와 같습니다.

  •  빠른 속도 – 두 개의 요청을 동시에 보내기 때문에 더 빠른 응답 속도를 보여줄 것입니다.
  •  적은 리소스 사용 – 현재 스레드가 블로킹되지 않고 다른 업무를 처리할 수 있어서 더 적은 수의 스레드로 더 많은 양의 요청을 처리할 수 있습니다.

 

백 프레셔

백 프레셔에 대해 알아보기 전에 RxJava로 유명해진 옵저버 패턴(observer patten)과 푸시(push) 방식, 그리고 풀(pull) 방식에 대해서 알아보겠습니다.

 

푸시 방식

옵저버 패턴에서는 발행자(publisher)가 구독자(subscriber)에게 밀어 넣는 방식으로 데이터가 전달됩니다. 발행자는 구독자의 상태를 고려하지 않고 데이터를 전달하는 데에만 충실합니다. 만약 발행자가 1초 동안 100개의 메시지를 보내는데 구독자는 1초에 10개밖에 처리하지 못한다면 어떻게 해야 할까요? 큐(queue)를 이용해서 대기 중인 이벤트를 저장해야 합니다.

서버가 가용할 수 있는 메모리는 한정되어 있습니다. 만약 초당 100개의 메모리를 계속 푸시한다면 버퍼는 순식간에 소모되고 말 텐데요. 버퍼를 다 사용해버려서 오버플로(overflow)가 발생하면 어떻게 될까요? 고정 길이 버퍼와 가변 길이 버퍼로 나누어 살펴보겠습니다.

  • 고정 길이 버퍼: 신규로 수신된 메시지를 거절합니다. 거절된 메시지는 재요청하게 되는데요. 재요청 과정에서 네트워크와 CPU 연산 비용이 추가로 발생합니다.
  • 가변 길이 버퍼: 이벤트를 저장할 때 ‘out of memory’ 에러가 발생하면서 서버 크래시(crash)가 발생합니다. ‘누가 그렇게 구현하겠어?’라고 생각할 수 있지만, Java에서 많이 사용되는 List가 가변 길이 자료 구조형입니다. 예를 들어 SQL로 많은 양의 데이터를 질의하면 DBMS는 발행자가 되고 여러분의 서버가 구독자가 되어 List 자료 구조형에 데이터를 전부 담으려고 하다가 다량의 GC가 발생하면서 서버가 정상적으로 응답할 수 없는 상태에 이를 수 있습니다.

이 문제를 어떻게 해결할 수 있을까요? 발행자가 데이터를 전달할 때 구독자가 필요한 만큼만 전달하면 해결할 수 있지 않을까요? 이게 바로 백 프레셔의 기본 원리입니다.

 

풀 방식

풀 방식에선 구독자가 10개를 처리할 수 있다면 발행자에게 10개만 요청합니다. 발행자는 요청받은 만큼만 전달하면 되고, 구독자는 더 이상 ‘out of memory’ 에러를 걱정하지 않아도 됩니다.

여기서 좀 더 탄력적으로 적용하여, 구독자가 이미 8개의 일을 처리하고 있다면 추가로 2개만 더 요청하여 자신이 현재 처리 가능한 범위 내에서만 메시지를 받게 할 수 있습니다. 풀 방식에선 이렇게 전달되는 모든 데이터의 크기를 구독자가 결정합니다. 이런 다이나믹 풀 방식의 데이터 요청을 통해서 구독자가 수용할 수 있는 만큼만 데이터를 요청하는 방식이 백 프레셔입니다.

 

표준

Reactive Streams는 표준화된 API입니다. 왜 표준화가 필요했고 어떤 과정을 통해 표준화가 되었는지 알아보겠습니다. 

Reactive Streams는 2013년에 Netflix와 Pivotal, Lightbend의 엔지니어들이 처음 개발하기 시작했습니다. Netflix는 RxJava, Pivotal은 WebFlux, 그리고 Lightbend는 분산 처리 액터(actor) 모델을 구현한 Akka를 만든 회사인데요. 모두 스트림 API가 꼭 필요한 회사였습니다. 그런데 스트림은 서로 유기적으로 엮여서 흘러야 의미가 있습니다. 데이터가 지속적으로 흐르기 위해서는 서로 다른 회사가 공통의 스펙을 설정하고 구현해야 합니다. 그래서 표준화가 필요했습니다. 

Reactive Streams에선 2015년 4월에 JVM에서 사용하기 위한 1.0.0 스펙을 릴리스했습니다. 그리고 2017년 9월에, Reactive Streams의 API와 스펙, 풀(pull) 방식 사용 원칙을 그대로 포팅해서 Flow API라는 이름으로 java.util.concurrent 패키지 아래 포함시킨 Java 9이 릴리스되었습니다. 이는 커뮤니티와 일부 기업에서 주도해 개발했던 Reactive Streams가 Java의 공식 기능이 되었다는 것을 의미합니다. 이어서 3달 뒤, Reactive Streams에서 Flow와 상호 변환이 가능한 어댑터를 릴리스하면서, 기존에 만들어진 라이브러리를 사용할 수 있게 되었습니다.

 

Reactive Streams API

Reactive Streams는 그럴듯한 단어로 설명되어 뭔가 복잡할 것 같지만, 실제 내부는 아주 간단한 API들의 조합으로 구성되어 있습니다.

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s);
}
 
public interface Subscriber<T> {
   public void onSubscribe(Subscription s);
   public void onNext(T t);
   public void onError(Throwable t);
   public void onComplete();
}
 
public interface Subscription {
   public void request(long n);
   public void cancel();
}
  •  Publisher에는 Subscriber의 구독을 받기 위한 subscribe API 하나만 있습니다.
  •  Subscriber에는 받은 데이터를 처리하기 위한 onNext, 에러를 처리하는 onError, 작업 완료 시 사용하는 onComplete, 그리고 매개 변수로 Subscription을 받는 onSubscribe API가 있습니다.
  •  Subscription은 n개의 데이터를 요청하기 위한 request와 구독을 취소하기 위한 cancel API가 있습니다.

이제 Reactive Streams에서 위 API를 사용하는 흐름을 살펴보겠습니다.

  1. Subscriber가 subscribe 함수를 사용해 Publisher에게 구독을 요청합니다.
  2. Publisher는 onSubscribe 함수를 사용해 Subscriber에게 Subscription을 전달합니다.
  3. 이제 Subscription은 Subscriber와 Publisher 간 통신의 매개체가 됩니다. Subscriber는 Publisher에게 직접 데이터 요청을 하지 않습니다. Subscription의 request 함수를 통해 Publisher에게 전달합니다.
  4. Publisher는 Subscription을 통해 Subscriber의 onNext에 데이터를 전달하고, 작업이 완료되면 onComplete, 에러가 발생하면 onError 시그널을 전달합니다.
  5. Subscriber와 PublisherSubscription이 서로 유기적으로 연결되어 통신을 주고받으면서 subscribe부터 onComplete까지 연결되고, 이를 통해 백 프레셔가 완성됩니다.

백 프레셔가 좋은 건 알겠는데, 과연 어떻게 사용하는 걸까요? Reactive Streams API는 GitHub에 들어가서 살펴봐도 위에서 살펴본 인터페이스가 전부입니다. 따로 구현체가 없습니다.

그렇다면 직접 구현해서 사용하면 될까요? 앞서 나온 규칙대로 Publisher 인터페이스를 구현하고 이를 구독할 때 Subscription을 생성해서 넘겨주도록 구현할 수는 있습니다. 하지만 이게 전부가 아닙니다. Reactive Streams에는 API 외에도 명세서가 있는데요. 이 명세서에는 단순한 인터페이스와는 달리 구현 시 따라야 하는 규칙이 복잡하게 명세되어 있습니다. 

이 명세에 맞춰 직접 구현한 기능은 Reative Streams TCK라는 툴로 검증할 수 있는데요. 해당 분야의 전문가가 아니라면 모든 규칙을 만족하도록 구현하는 게 꽤나 까다로운 일입니다. 특히 Publisher를 구현하는 게 어렵습니다. Java의 유명한 Reactive Streams 구현체 중 하나인 Project Reactor의 GitHub에 올라온 이슈를 살펴보면, 어떤 사용자가 자신이 만든 커스텀 Publisher가 Flux와 잘 연결되지 않는다는 내용으로 이슈를 올려놓은 게 있습니다(참고). 이에 대한 Project Reactor의 답변은 단호합니다. ‘만들지 마세요!’라고 답변했습니다. 학습 차원에서 토이 프로젝트로 만드는 것은 저도 추천합니다. 공부도 되고, 좋습니다. 하지만 실제 운영에서 사용할 코드라면 검증되지 않은 코드보다는,  Flux.create()와 같은 생성자 함수를 활용해서 Publisher를 만드는 게 좋다고 생각합니다. Reactive Streams를 사용하려면 직접 구현하는 것보다는 이미 잘 만들어져서 검증까지 받은 구현체를 사용하는 게 좋습니다. 그렇다면 이런 구현체에는 어떤 게 있을까요?

 

Reactive Stream 구현체와 상호운영성(interoperability)

Reactive Streams에는 다양한 구현체가 존재합니다. 각각의 구현체는 서로 특성이 조금씩 다르기 때문에 상황에 따라, 필요에 맞게 골라서 사용할 수 있습니다.

이렇게 각각 특성이 다른 구현체들은 모두 Reactive Streams를 통해서 서로 통신할 수 있습니다. 

 

Reactive Stream 상호 운영

RxJava의 Observable는 Reactive Streams를 통해서 Armeria의 HttpResponse나 Project Reactor의 Flux로 변환될 수 있고, MongoDB의 DataPublisher는 Akka Streams의 Source를 통해 스트림 연산을 할 수 있습니다.

예를 들어 MongoDB에서 조회한 데이터를 연산 처리한 후 HTTP 응답으로 보낸다고 가정해 보겠습니다. Reactive Streams를 이용해서 MongoDB에서 데이터를 조회하면 우선 데이터를 구독 받을 수 있는 Pulisher만 반환됩니다. Subscriber가 Subscription을 통해 request를 호출하기 전까진, 실제 데이터는 전달되지 않습니다. 

// Initiate MongoDB FindPublisher
FindPublisher<Document> mongoDBUsers = mongodbCollection.find();

아래 코드와 같이 Obsevable의 fromPublisher를 통해서 MongoDB의 FindPublisher와 연결할 수 있습니다. 그리고 map 연산자를 이용해 조회 결과에서 age 필드를 추출할 수 있습니다.

// MongoDB FindPublisher -> RxJava Observable
Observable<Integer> rxJavaAllAges =
    Observable.fromPublisher(mongoDBUsers)
              .map(document -> document.getInteger(“age”));

RxJava의 Observable은 옵저버 패턴으로 구현되어 있기 때문에, 이를 Reactive Streams로 변환하기 위해서는 아래 코드와 같이 toFlowable 함수를 이용해야 합니다. 변환 후 Flux의 from 함수를 이용해 RxJava의 Flowable과 Flux를 연결할 수 있습니다.

// RxJava Observable -> Reactor Flux
Flux<HttpData> fluxHttpData =
    Flux.from(rxJavaAllAges.toFlowable(BackpressureStrategy.DROP))
        .map(age -> HttpData.ofAscii(age.toString()));

Flux 자료 구조를 HTTP 응답으로 사용하려면 아래와 같이 데이터 앞에 HTTP 헤더를 붙여야 합니다. 그 후 Armeria의 HTTP 응답으로 사용하기 위해 HttpResponse.of를 호출해 Flux와 연결합니다. 

// Reactor Flux -> Armeria HttpResponse
HttpResponse.of(Flux.concat(httpHeaders, fluxHttpData));

앞서도 언급했지만 명심할 것은, 일반적인 Iterator와는 달리 아직 실제 연산(변환, 조작, 계산 등)은 전혀 일어나지 않았다는 점입니다. 단지 데이터가 어떻게 Subscriber에게 흘러갈지 그 행위를 기술한 것뿐입니다. Reactive Streams에서는 Subscriber가 데이터를 요청하기 전까지는 아무런 데이터도 전송하지 않아야 합니다.

 

마치며

이번 포스팅에서는 Reactive Streams와 그 구현체, 그리고 상호운영성에 대해 알아보았습니다. Reactive Streams를 웹 프로그래밍에서 활용하기 위해서는 HTTP 요청과 응답에 백 프레셔를 이용해야 합니다. 이를 통해 유입되는 트래픽의 양에 탄력적으로 반응해야 하는데요. 다음 포스팅에선 이와 관련해 Armeria에서 Reactive Streams를 어떻게 사용하고 있는지 자세히 알아보겠습니다. 많이 기대해주세요!

Related Post