Armeria로 Reactive Streams와 놀자! – 2

안녕하세요. LINE+에서 오픈소스 Armeria와 Central Dogma를 개발하고 있는 엄익훈입니다. 지난 포스팅에선 Reactive Streams의 개념을 살펴보았는데요. 이번 포스팅에선 Reactive Streams를 오픈 소스 비동기 HTTP/2, RPC, REST 클라이언트/서버 라이브러리인 Armeria에서 사용하는 방법에 대해 공유하려고 합니다.

 

What’s Armeria?

Armeria는 Java 8 및 Netty, Thrift, gRPC에 기반한 오픈 소스 비동기 HTTP/2, RPC, REST 클라이언트/서버 라이브러리입니다. Armeria는 경량(Lightweight) 마이크로 서비스 프레임워크지만 지원하는 기능은 다른 풀 스택(full stack) 웹 프레임워크와 비교해도 손색이 없습니다.

먼저 Armeria에서 Reactive Streams를 활용한 서버를 구현하기 위해선 기본적으로 알아야 할 것들을 알려드리겠습니다.

 

지원하는 프로토콜

Armeria에서는 HTTP/1과 HTTP/2를 모두 지원하고, 이 두 개의 프로토콜은 cleartext와 TLS(Transport Layer Security) 암호화 통신을 모두 지원합니다. HTTP/1에서 HTTP/2로의 호환성을 지원하기 위한 프로토콜 업그레이드 면에선, HTTP/2의 ‘connection preface’와 HTTP/1의 ‘upgrade request’를 모두 지원하고 있습니다.

또한 Armeria에선 gRPC와 Thrift가 HTTP/1과 HTTP/2, 양쪽에서 모두 동작합니다. 이는 Armeria만의 특별한 기능인데요. gRPC는 HTTP/1을 지원하지 않고, 기존의 Thrift에선 HTTP/2를 지원하지 않습니다. 하지만 Armeria에서는 모두 지원하고 있어서 다양한 비즈니스 환경에서 유연하게 사용할 수 있습니다. 또한 리눅스 환경에선 JNI(Java Native Interface) 기반의 소켓 IO와 BoringSSL 기반의 TLS를 통해서 더욱더 빠른 성능으로 프로덕션에 사용할 수 있습니다.

그럼 예제 코드와 함께 Armeria에 대해서 하나씩 알아보겠습니다.

 

예제 코드로 알아보는 Armeria

Armeria는 사용자 친화적인 API입니다. 코드가 간결하고 사용하기 쉽습니다. ‘Hello world’ 서버를 실행하고 싶다면 아래와 같이 5줄만 작성하면 됩니다. 

// Build your own server under 5 lines.
var server = Server.builder()
       .http(8080)
       .service(“/“, (ctx, req) -> HttpResponse.of(“Hello, World!”))
       .build();
 
server.start();

서버를 쉽게 실행할 수 있다는 점은 마이크로 서비스 환경에서 각각의 비즈니스 컴포넌트를 분리하여 독립된 서버로 관리하려고 할 때 핵심이 되는 사항입니다.

또한 서버 아키텍처를 단순하게 만들 수 있습니다. HTTPS나 HTTP/2를 사용하기 위해 별도로 사이드카(sidecar)인 Nginx나 Apache Httpd와 같은 정적 웹 서버를 실행할 필요가 없습니다. 앞서 언급했듯, 리눅스 환경에서는 JNI 기반의 소켓 IO와 BoringSSL을 지원하기 때문에 별도로 성능 저하를 고려하지 않아도 됩니다. 또한 JS나 CSS, 이미지와 같은 정적 파일을 서빙하는 기능도 제공하고 있습니다(참고). 

var server = Server.builder()
       .http(8080)
       .https(8443) // HTTPS support
       .tlsSelfSigned()
       .service(“/“, (ctx, req) -> HttpResponse.of(“Hello, World!”))
       .build();
server.start();

네트워크 홉(hop)을 추가하지 않아도 되기 때문에 장애점을 줄일 수 있고, 리소스를 절약할 수 있어 효율적으로 통신할 수 있으며, 아키텍처가 단순해지고, 모니터링이 쉬워지며, 서버를 유연하게 확장할 수 있습니다. 

그 외에도 유용한 기능이 많습니다. 그중 하나가 Armeria에서 빌트인으로 제공하는 어노테이션(annotation)입니다. 어노테이션을 이용하면 Armeria의 기능을 더욱 쉽게 사용할 수 있습니다. 예를 들어 아래와 같이 hello를 프리픽스(prefix)로, name을 경로 변수로 설정하여 라우팅하는 코드를 손쉽게 작성할 수 있습니다(Armeria 공식 문서를 참고하면 그 외 다양한 어노테이션을 더욱 자세하게 살펴볼 수 있습니다).

import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.server.annotation.PathPrefix;
 
// Use built-in annotations for mapping path and parameter injection
@PathPrefix(“/hello”)
class HelloService {
   @Get(“/:name”)
   public String hello(@Param String name) {
       return String.format(“Hello, %s!”, name);
   }
}

Armeria에는 다른 종류의 RPC 프로토콜을 동시에 서빙할 수 있는 특별한 기능도 있습니다. 하나의 서버에서 REST API와 gRPC, Thrift를 모두 제공할 수 있어서 비즈니스 요구 사항이나 아키텍처의 변화에 유연하게 대응할 수 있습니다. 또한 단일 포트로 제공하기 때문에 리소스를 효율적으로 사용하면서 보안 측면에서 불필요한 노출을 최소화하고, 관리 포인트를 줄일 수 있는 장점이 있습니다.1

var server = Server.builder()
   .http(8080)
   .service(“/hello/rest”, (ctx, req) -> HttpResponse.of(“Hello, world!”))
   .service(“/hello/thrift”, THttpService.of(new ThriftHelloService()))
   .service(“/hello/grpc”, GrpcService.builder()
                                      .addService(new GrpcHelloService())
                                      .build())
   .build();

엔터프라이즈 소프트웨어 개발자들은 공통적으로 인증, 로깅과 같은 부분을 어떻게 하면 효율적으로 처리할 수 있을지에 관심을 갖고 고민하게 됩니다. Armeria에서는 separation of concerns라고 부르는 이런 부분을 별도의 decorator로 관리할 수 있는 기능을 제공합니다.2

또한 기본 제공 외에 다른 decorator가 필요하다면, 직접 손쉽게 구현해서 특정 경로나 서비스에 바인딩해서 사용할 수도 있습니다. 예를 들어 아래 그림에서 구현한 AuthService 같은 경우에는 request에 인증 정보가 포함되어 있을 때만 서비스를 호출하고, 그렇지 않으면 ‘401 unauthorized’ 에러를 발생시키도록 했습니다.

다음으로 Armeria에서 Reactive Streams를 어떻게 지원하고 있는지 알아보겠습니다.

 

Armeria 내 HTTP/2 스트림

스트림(stream)은 흐르는 물과 같이 유기적으로 계속 연결되어 있어야 합니다. 어느 한곳만 뚫려있고 나머지가 막혀 있다면 곧 넘쳐 버립니다. Armeria에서는 데이터의 흐름을 유기적으로 제어하기 위해서 서버 내에선 Reactive Streams의 백 프레셔를 이용해 트래픽을 제어합니다. 또한 WINDOW_UPDATE를 이용한 HTTP/2 stream flow control로, 백 프레셔가 하위 네트워크 레이어에서 Armeria 서버와 여러분이 구현한 서비스, 그리고 데이터 저장소까지 유기적으로 연결할 수 있도록 만들었습니다. 

만약 서비스에서 사용하는 서버를 Armeria의 Reactive 서버로 당장 바꾸기 어렵다면, 아래와 같이 Armeria를 Reactive 프락시 서버로 활용할 수도 있습니다(참고).

// Use Armeria’s async & reactive HTTP/2 client.
var client = HttpClient.of(“h2c://backend”);
var server = Server.builder()
    .http(8080)          // Forward all requests reactively
    .service(“prefix:/“, (ctx, req) -> client.execute(req))
    .build();

Armeria 프락시 서버를 앞에 두면 여러분의 서버를 험난한 외부 인터넷으로부터 안전하게 보호할 수 있습니다.

 

Reactive Streams와 Armeria의 통합

Armeria로 직접 Reactive Streams를 지원하는 서버를 만든다면 더욱더 다양한 기능을 쓸 수 있는데요. Armeria에서 Reactive Streams를 활용하는 방법과 빌트인 publisher에 대해서 좀 더 자세히 알아보겠습니다.

 

Armeria HTTP 응답 Publisher 

Armeria의 응답은 헤더를 표현하는 HttpHeaders와 데이터를 표현하는 HttpData로 구성되어 있습니다. RxJava의 Observable을 Armeria의 HttpResponse로 바꾸는 과정을 단계별로 살펴보겠습니다. 

데이터가 준비되면 먼저 Observable의 map 연산자를 이용해서 dataStream을 HttpData로 감싸줍니다.

// 1. Fetch data from Reactive Streams Publisher
Observable<String> dataStream = Observable.just(“a”, “b”, “c”, “d”, “e”);
 
// 2. Convert string to Armeria HttpData
Observable<HttpData> httpDataStream = dataStream.map(HttpData::ofUtf8);

어떤 식으로 응답할지 결정하여 응답 헤더를 준비한 후, concat 연산자로 위에서 준비한 httpDataStream과 합치면, HttpHeaders에서 HttpData로 연결되는 하나의 스트림이 완성됩니다.

// 3. Prepare response headers
ResponseHeaders httpHeaders = ResponseHeaders.of(HttpStatus.OK);
 
// 4. Concat http header and body stream
Observable<HttpObject> responseStream = Observable.concat(Observable.just(httpHeaders), httpDataStream);

완성된 스트림을 Observable의 toFlowable 함수를 이용하여 Reactive Streams의 Flowable로 변환한 뒤, 최종적으로 Armeria의 HttpResponse로 감싸면 과정이 끝납니다.

// 5. Convert Observable to Armeria Response Stream
HttpResponse response = HttpResponse.of(responseStream.toFlowable(BackpressureStrategy.BUFFER));

 

Armeria 빌트인 Publisher

위 과정이 조금 복잡하거나, 혹은 지루하게 느껴졌을 수도 있습니다. 그래서 Armeria에서는 웹에서 스트림 데이터를 전달하는 방법의 표준인 JSON Text Sequences(RFC 7464)와 HTML5 규격인 Server-sent events에 대한 빌트인 Publisher를 제공합니다. 이 publisher를 이용하면 Reactive Streams를 좀 더 편리하게 웹으로 전달할 수 있습니다.

// Fetch data from Reactive Streams Publisher
Publisher<String> dataStream = Flux.just(“a”, “b”, “c”, “d”, “e”);
 
// Convert Publisher to JSON Text Sequences with Armeria HttpResponse
// with “application/json-seq” MIME type
HttpResponse httpResponse = JsonTextSequences.fromPublisher(dataStream);
 
// Convert Publisher to Server-sent Events with Armeria HttpResponse
// with “text/event-stream” MIME type
HttpResponse httpResponse = ServerSentEvents
        .fromPublisher(dataStream, SeverSentEvent::ofData);

또한 빌트인 publisher를 더욱 쉽게 사용할 수 있도록 RxJava 통합을 지원하고 있습니다. 어노테이션한 서비스에 @ProduceJsonSequence 어노테이션을 추가하고, Observable을 그대로 반환하면 Armeria에서 해당 프로토콜로 자동으로 변환합니다.

import io.reactivex.Observable;
import com.linecorp.armeria.server.annotation.ProducesJsonSequences;
 
class RxJavaService {
    @Get(“/json-streaming”)
      // Generate JSON Text Sequences
    @ProducesJsonSequences
    public Observable<String> json() {
          // Just return RxJava Observable! 
        return Observable.just(“a”, “b”, “c”);
    }
};

아래와 같이’JsonTextSequences’로 인코딩하면 JSON 문자열의 시작과 끝에 레코드 구분자(separator)와 라인 피드(line feed)가 추가됩니다. 또한 현재 프로토콜에 따라 HTTP/1과 HTTP/2로 전송될 때 다르게 동작해야 하는데요. Armeria에선 현재 연결된 프로토콜에 따라 알아서 적합한 형태의 데이터로 전송합니다. HTTP/2의 경우에는 Data frame안에 JSON 데이터를 나누어서 보내고, HTTP/1의 경우에는 chunked transfer encoding을 이용하여 나누어서 보냅니다.

 

Spring WebFlux 통합

Armeria는 다양한 라이브러리 및 프레임워크와의 연동을 지원합니다. Reactive Streams를 사용하기 위해서 이미 Spring WebFlux를 사용하고 있다면, 별도의 코드 수정 없이 ‘armeria-spring-boot-webflux-starter’를 의존성에 추가(참고)하는 것만으로 Armeria로의 마이그레이션을 끝낼 수 있습니다. 이런 방식으로 WebFlux의 네트워크 레이어인 Reactor-Netty를 Armeria의 Reactive 엔진으로 교체할 수 있습니다.

단순히 엔진만 교체하는 거라면 ‘그냥 WebFlux만 사용하면 되지 않을까?’라는 의문이 생길 수도 있는데요. 과연 어떤 이점이 있는 걸까요?

엔진을 교체하면 ArmeriaServerConfigurator를 통해서 Spring에 Armeria의 기능을 추가할 수 있어서, 기존 Spring에서는 지원하지 않는 Armeria만의 기능을 활용할 수 있습니다. 예를 들어 아래와 같이 REST API로 구성된 기존 Spring Webflux 서버에 gRPC나 Thrift 기능을 추가하여 기존과 동일한, 단일 포트에서 서비스할 수 있습니다.

@Configuration
public class ArmeriaConfiguration {
   // Configure the server by providing an ArmeriaServerConfigurator bean.
   @Bean
   public ArmeriaServerConfigurator armeriaServerConfigurator() {
       // Customize the server using the given ServerBuilder. For example:
       return builder -> {
           // Add DocService that enables you to send gRPC and Thrift requests from web browser.
           builder.serviceUnder(“/docs”, new DocService());
           // Log every message which the server receives and responds.
           builder.decorator(LoggingService.newDecorator());
           // Write access log after completing a request.
           builder.accessLogWriter(AccessLogWriter.combined(), false);
           // You can also bind asynchronous RPC services such as Thrift and gRPC:
           builder.service(THttpService.of(…));
           builder.service(GrpcService.builder()…build());
       };
   }
}

또한 이외에도 앞서 설명드린 decorator 기능을 활용해서 더욱더 풍성한 서비스로 만들 수 있습니다.

 

gRPC 스트림 지원

gRPC는 StreamObserver를 이용하여 스트림을 지원합니다(참고). 그리고 Reactive Streams에선 gRPC의 StreamObserver로 손쉽게 변환할 수 있습니다. 예제를 통해서 알아보겠습니다.

우선 아래와 같이 protobuf를 이용해 데이터를 어떤 식으로 주고받을지, 인터페이스를 정의합니다. 

syntax = “proto3”;
package users;
option java_package = “users”;
 
service UserService {
  // Returns all user information
  rpc getAllUsers(UserRequest) returns (stream User) {}
 
  // Push to stream of users
  rpc pushToUsers(stream User) returns (Result) {}
}

전체 회원에게 메일 혹은 푸시 메시지를 보내고 싶다면 먼저 모든 사용자 정보를 조회해야 하는데요. 만약 회원 수가 너무 많다면 데이터를 한 번에 전송하는 게 어려울 수 있습니다. 그럴 때 아래와 같은 방법으로 대량의 사용자 정보를 반환하는 스트림 서버를 gRPC로 구축할 수 있습니다.

  1. ProjectReactor의 Flux를 Publisher로 사용하여 저장소에서 스트림 형태로 데이터를 조회합니다.
  2. gRPC의 StreamObservser로 변환하여 외부로 전송합니다.
// Implement interfaces generated by gRPC
public final class UserServiceImpl extends UserServiceImplBase {
    @Override
    public void getAllUsers(UserRequest request, StreamObserver<User> responseObserver) {
        final Flux<User> userPublisher = userRepo.findAll();
        publisher.subscribe(responseObserver::onNext,
                            responseObserver::onError,
                            responseObserver::onCompleted);
    }
}

StreamObserver에도 Reactive Streams와 유사하게 onNextonErroronCompleted 함수가 존재하기 때문에 각각의 API를 매칭해서 연결하기만 하면 됩니다.

이번에는 수신 측에서 전체 사용자의 정보를 스트림으로 받는 것을 생각해 볼 건데요. Processor를 활용하겠습니다. Processor는 별도의 API를 가지고 있지 않고, Subscriber와 Publisher, 이 두 개의 인터페이스를 상속받고 있는 게 전부인데요. 데이터를 구독 받고, 구독한 데이터를 다시 발행할 때 유용하게 사용할 수 있습니다. StreamObserver의 onNext로 들어온 새로운 사용자 정보를 Processor의 onNext 함수로 전달할 수 있고, 이를 다시 구독해서 원하는 작업을 추가로 수행할 수 있습니다.

@Override
public StreamObserver<User> pushToUsers(StreamObserver<Result> responseObserver) {
    Processor<User, User> processor = EmitterProcessor.create();
    Publisher<User> publisher = processor;
    Subscriber<User> subscriber = processor;
    // Push one-by-one by subscribing publisher
    ...
     
      return new StreamObserver<User>() {
        // subscribe user data 
       
        @Override public void onNext(User user) { processor.onNext(user); }
        @Override public void onError(Throwable throwable) { processor.onError(throwable); }
        @Override public void onCompleted() {
            responseObserver.onNext(Result.newBuilder().setStatus(200).build());
            responseObserver.onCompleted();
        }
    };
}

작성한 gRPC 코드를 Armeria에서 실행해보겠습니다.

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.grpc.GrpcService;
 
// Add your grpc service to Armeria GrpcService
var grpcService = GrpcService.builder()
                             .addService(new UserServiceImpl())
                             .build();
 
var server = Server.builder()
                   .http(8080)
                   .serviceUnder(“/grpc”, grpcService)
                   .serviceUnder(“/docs”, new DocService())
                   .build();

이렇게 만든 스트림 서버를 이용해 작은 메모리로도 많은 양의 데이터를 스트림 형태로 처리할 수 있습니다.

 

마이크로 서비스를 위한 Armeria의 기능

Armeria를 통해 Reactive Streams를 활용하면 많은 양의 데이터와 트랙픽을 유연하게 처리할 수 있습니다. 또한 RPC를 통해서 마이크로 서비스간 통신도 쉽게 처리할 수 있습니다.

이외에도 아래와 같이 마이크로 서비스에 필요한 다양한 기능을 제공하고 있습니다.

  • 클라우드 환경에서 서버의 위치를 파악하기 위해 Kubernetes 유형의 DNS(Domain Name System)와 ZooKeeper를 활용한 서비스 검색(discovery)을 제공(참고)하고 있습니다.
  • 검색된 서비스는 클라이언트 측 로드 밸런싱으로 직접 부하를 분산하며 서버와 통신합니다. 이를 통해 장애점을 줄일 수 있습니다. 
  • L7이 아닌 클라이언트가 직접 서버 상태를 체크할 수 있습니다. 
  • 로그를 확인할 때 분산된 서버에 접근하지 않고도 확인할 수 있도록 Kafka로 접속 로그를 전송합니다.
  • Micrometer를 활용해서 필요한 수치를 설정, 수집하여 Prometheus나 Netflix의 Atlas와 같은 애플리케이션 모니터링 툴로 전송할 수 있습니다.

 

마치며

두 번에 걸쳐 Reactive Streams와 Armeria에 대한 내용을 공유드렸습니다. 함께 살펴보았듯 Armeria를 이용하면, Reactive Streams와 고성능, 비동기, RPC, HTTP/2를 지원하면서 장애에 탄력적인, 보다 안전한 서버를 구축할 수 있습니다.

Armeria에선 포스팅에서 언급한 기능 외에 더 많은 기능을 제공하고 있습니다. 이와 관련하여 Armeria의 공식 홈페이지와 아래 링크를 참고하시기 바랍니다.

 


 

  1. gRPC 서버를 Armeria에서 구동하는 법과 Thrift 서버를 Armeria에서 구동하는 법을 참고하시기 바랍니다.
  2. Armeria에서 기본적으로 제공하는 다양한 decorator는 공식 문서와 아래 Javadoc에서 확인해 볼 수 있습니다. 

Related Post