LINE株式会社は、2023年10月1日にLINEヤフー株式会社になりました。LINEヤフー株式会社の新しいブログはこちらです。 LINEヤフー Tech Blog

Blog


Let’s play Reactive Streams with Armeria vol.2

こんにちは。LINE PlusでオープンソースソフトウェアのArmeriaとCentral Dogmaを開発しているUM IKHUNです。前回の記事では、Reactive Streamsの概念を解説しました。今回の記事では、Reactive Streamsをオープンソース非同期のHTTP/2、RPC、RESTクライアント/サーバーライブラリーであるArmeriaで使用する方法について紹介したいと思います。

What’s Armeria?

Armeriaは、Java 8およびNetty、Thrift、gRPCをベースにしたオープンソース非同期HTTP/2、RPC、RESTクライアント/サーバーライブラリーです。Armeriaは、軽量(Lightweight)のマイクロサービスフレームワークですが、サポートする機能は、他のフルスタック(full stack)ウェブフレームワークと比較しても劣りません。

まず、ArmeriaでReactive Streamsを活用したサーバーを実現するために、基本的に知っておくべきことを説明します。

サポートするプロトコル

Armeriaでは、HTTP/1とHTTP/2を両方ともサポートしており、この2つのプロトコルはcleartextとTLS(Transport Layer Security)暗号化通信をすべてサポートしています。HTTP/1からHTTP/2への互換性をサポートするためのプロトコルアップグレードについては、HTTP/2の「connection preface」とHTTP/1の「upgrade request」を両方ともサポートしています。

また、ArmeriaではgRPCとThriftが、HTTP/1とHTTP/2の両方で動作します。これはArmeriaならではの特別な機能です。gRPCはHTTP/1をサポートせず、既存のThriftではHTTP/2をサポートしません。しかし、Armeriaではすべてサポートしており、多様なビジネス環境で柔軟に使用できます。また、Linux環境では、JNI(Java Native Interface)ベースのソケットIOとBoringSSLベースのTLSにより、一層スピーディに本番環境で使用できます。

では、サンプルコードを見ながらArmeriaについて1つずつ説明します。

サンプルコードで見るArmeria

Armeriaは、ユーザーに優しいAPIです。コードが簡潔で使いやすくなっています。「Hello world」サーバーを実行したいときは、以下のように5行を作成するたけで済みます。

// Build your own server under 5 lines.
var server = Server.builder()
       .http(8080)
       .service("/", (ctx, req) -> HttpResponse.of("Hello, World!"))
       .build();
 
server.start();

サーバーを簡単に実行できるというのは、マイクロサービス環境において各ビジネスコンポーネントを分離し、独立したサーバーとして管理するときにポイントになることです。

また、サーバーのアーキテクチャをシンプルにすることができます。HTTPSやHTTP/2を使用するために別途のサイドカー(sidecar)であるNginxやApache Httpdのような静的Webサーバーを実行する必要がありません。前述のように、Linux環境ではJNIベースのソケットIOとBoringSSLをサポートするため、別途の性能低下を考慮する必要はありません。なお、JSやCSS、画像のような静的ファイルをホストする機能も提供しています(参照)。 

var server = Server.builder()
       .http(8080)
       .https(8443) // HTTPS support
       .tlsSelfSigned()
       .service("/", (ctx, req) -> HttpResponse.of("Hello, World!"))
       .build();
server.start();

ネットワークホップ(hop)を追加せずに済むため、障害点を減らすことができ、リソースを節約できるので効率よく通信できます。さらに、アーキテクチャがシンプルでモニタリングが簡単になり、サーバーを柔軟に拡張できます。 

その他にも有効な機能が多くあります。その1つが、Armeriaでビルトインで提供するアノテーション(annotation)です。アノテーションを利用すると、Armeriaの機能をより簡単に使用できます。例えば、以下のようにhelloをプリフィックス(prefix)として、nameを経路変数として設定し、ルーティングするコードを簡単に作成できます(Armeriaの公式文書を参照すると、その他に多様なアノテーションをより詳しく見ることができます)。

import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.server.annotation.PathPrefix;
 
// Use built-in annotations for mapping path and parameter injection
@PathPrefix("/hello")
class HelloService {
   @Get("/:name")
   public String hello(@Param String name) {
       return String.format("Hello, %s!", name);
   }
}

Armeriaには、他の種類のRPCプロトコルを同時に扱える特別な機能もあります。1つのサーバーでREST APIとgRPC、Thriftをすべて提供できるため、ビジネスの要望やアーキテクチャの変化に柔軟に対応できます。また、単一ポートで提供しているので、リソースを効率よく使用してセキュリティの面で不要な露出を最低限に抑え、管理ポイントを削減できるメリットがあります。1

var server = Server.builder()
   .http(8080)
   .service("/hello/rest", (ctx, req) -> HttpResponse.of("Hello, world!"))
   .service("/hello/thrift", THttpService.of(new ThriftHelloService()))
   .service("/hello/grpc", GrpcService.builder()
                                      .addService(new GrpcHelloService())
                                      .build())
   .build();

エンタープライズソフトウェアの開発者は一般的に、認証やロギングなどをいかに効率よく処理できるかに関心を持って悩むと思います。Armeriaでは、separation of concernsと呼んでいるこのような部分を別途decoratorで管理できる機能を提供しています。2

また、標準で提供されていないdecoratorが必要であれば、直接簡単に実装し、特定経路やサービスにバインディングして使用することもできます。例えば、下図で実装したAuthServiceでは、requestに認証情報が含まれているときのみサービスを呼び出し、そうでなければ「401 unauthorized」エラーを発生させるようにしました。

次に、ArmeriaでReactive Streamsをどのようにサポートしているかを説明します。

Armeria内のHTTP/2ストーリム

ストリーム(stream)は流れている水のように、有機的に繋がり続ける必要があります。ある1か所だけ開いていて他のところは詰まっていると、すぐ溢れてしまいます。Armeriaでは、データの流れを有機的にコントロールするために、サーバー内ではReactive Streamsのバックプレッシャーを利用してトラフィックをコントロールします。また、WINDOW_UPDATEを利用したHTTP/2 stream flow controlで、バックプレッシャーが配下のネットワークレイヤーでArmeriaのサーバーと皆さんが実装したサービス、そしてデータのリポジトリまで有機的に繋げられるようにしました。 

もし皆さんのサービスで使用しているサーバーを、今すぐArmeriaのReactiveサーバーに取り換えることができない事情があるなら、以下のようにArmeriaをReactiveプロキシサーバーとして活用することもできます(参照)。

// Use Armeria’s async & reactive HTTP/2 client.
var client = HttpClient.of("h2c://backend");
var server = Server.builder()
    .http(8080)          // Forward all requests reactively
    .service("prefix:/", (ctx, req) -> client.execute(req))
    .build();

Armeriaプロキシサーバーを前に配置すると、皆さんのサーバーを厳しい外部のインターネットから安全に保護できます。

Reactive StreamsとArmeriaの統合

Armeriaで直接Reactive Streamsをサポートするサーバーを作ると、より多様な機能を使用できます。ArmeriaでReactive Streamsを活用する方法とビルトインpublisherについてより詳しく説明します。

Armeria HTTPレスポンスのPublisher

Armeriaのレスポンスは、ヘッダーを表現するHttpHeadersとデータを表現するHttpDataで構成されています。ここでは、RxJavaのObservableをArmeriaのHttpResponseに取り換える過程を、ステップごとに見てみましょう。 

データの準備が終わったら、まずObservableのmap演算子を利用してdataStreamをHttpDataでラップします。

// 1. Fetch data from Reactive Streams Publisher
Observable<String> dataStream = Observable.just("a", "b", "c", "d", "e");
 
// 2. Convert string to Armeria HttpData
Observable<HttpData> httpDataStream = dataStream.map(HttpData::ofUtf8);

どのようにレスポンスするかを決めてレスポンスヘッダーを準備した後、concat演算子を利用して、上記で準備したhttpDataStreamと合わせると、HttpHeadersからHttpDataに繋がる1つのストリームが完成します。

// 3. Prepare response headers
ResponseHeaders httpHeaders = ResponseHeaders.of(HttpStatus.OK);
 
// 4. Concat http header and body stream
Observable<HttpObject> responseStream = Observable.concat(Observable.just(httpHeaders), httpDataStream);

完成したストリームをObservableのtoFlowable関数を利用してReactive StreamsのFlowableに変換した後、最終的にArmeriaのHttpResponseでラップすれば終わりです。

// 5. Convert Observable to Armeria Response Stream
HttpResponse response = HttpResponse.of(responseStream.toFlowable(BackpressureStrategy.BUFFER));

ArmeriaのビルトインPublisher

上記の過程がやや複雑に、あるいは長くて退屈に感じられたかもしれません。そこで、ArmeriaではWebでストリームデータを送信する方法の標準であるJSON Text Sequences(RFC 7464)とHTML5の規格であるServer-sent eventsに対するビルトインPublisherを提供します。このpublisherを利用すると、Reactive Streamsをより便利にWebで送信できます。

// Fetch data from Reactive Streams Publisher
Publisher<String> dataStream = Flux.just("a", "b", "c", "d", "e");
 
// Convert Publisher to JSON Text Sequences with Armeria HttpResponse
// with "application/json-seq" MIME type
HttpResponse httpResponse = JsonTextSequences.fromPublisher(dataStream);
 
// Convert Publisher to Server-sent Events with Armeria HttpResponse
// with "text/event-stream" MIME type
HttpResponse httpResponse = ServerSentEvents
        .fromPublisher(dataStream, SeverSentEvent::ofData);

また、ビルトインpublisherをより簡単に使用できるようにRxJava統合をサポートしています。アノテーションのサービスに@ProducesJsonSequencesアノテーションを追加し、Observableをそのまま返すと、Armeriaで当該プロトコルで自動的に変換します。

import io.reactivex.Observable;
import com.linecorp.armeria.server.annotation.ProducesJsonSequences;
 
class RxJavaService {
    @Get("/json-streaming")
      // Generate JSON Text Sequences
    @ProducesJsonSequences
    public Observable<String> json() {
          // Just return RxJava Observable! 
        return Observable.just("a", "b", "c");
    }
};

以下のように「JsonTextSequences」でエンコードすると、JSON文字列の最初と最後のところにレコード分離文字(separator)とラインフィード(line feed)が追加されます。また、現在のプロトコルによってHTTP/1やHTTP/2に送信される際、違う動作になるべきです。Armeriaでは現在繋がっているプロトコルに従って、適したタイプのデータを送信します。HTTP/2の場合は、Data frameの中にJSONデータを分けて送り、HTTP/1の場合はchunked transfer encodingを利用して分けて送ります。

Spring WebFluxの統合

Armeriaは、さまざまなライブラリーおよびフレームワークとの連携をサポートします。Reactive Streamsを使用するために、すでにSpring WebFluxを使用している場合は、コードを別途修正せずに「armeria-spring-boot-webflux-starter」を依存性に追加(参照)するだけで、Armeriaへのマイグレーションを完了できます。このような方法で、WebFluxのネットワークレイヤーであるReactor-NettyをArmeriaのReactiveエンジンに取り換えることができます。

単にエンジンを取り換えるだけなら「ただWebFluxだけ使用すればいいんじゃないか」という疑問が湧くかもしれません。エンジンを取り換えることに、果たしてどのようなメリットがあるのでしょう。

エンジンを取り換えると、ArmeriaServerConfiguratorによりSpringでArmeriaの機能を追加できるため、既存のSpringではサポートしないArmeriaならではの機能を活用できます。例えば、以下のようにREST APIで構成される既存のSpring WebfluxサーバーにgRPCやThrift機能を追加し、従来と同じく単一ポートでサービスできます。

@Configuration
public class ArmeriaConfiguration {
   // Configure the server by providing an ArmeriaServerConfigurator bean.
   @Bean
   public ArmeriaServerConfigurator armeriaServerConfigurator() {
       // Customize the server using the given ServerBuilder. For example:
       return builder -> {
           // Add DocService that enables you to send gRPC and Thrift requests from web browser.
           builder.serviceUnder("/docs", new DocService());
           // Log every message which the server receives and responds.
           builder.decorator(LoggingService.newDecorator());
           // Write access log after completing a request.
           builder.accessLogWriter(AccessLogWriter.combined(), false);
           // You can also bind asynchronous RPC services such as Thrift and gRPC:
           builder.service(THttpService.of(…));
           builder.service(GrpcService.builder()…build());
       };
   }
}

なお、他にも前述で説明したようにdecorator機能を活用し、さらに機能が豊富なサービスを構築できます。

gRPCのストリームサポート

gRPCは、StreamObserverを利用してストリームをサポートします(参照)。Reactive Streamsで発行(publish)されるデータをStreamObserverで簡単に送信できます。サンプルで見てみます。

まず以下のようにprotobufを利用してデータをどのようにやり取りするか、インターフェースを定義します。 

syntax = "proto3";
package users;
option java_package = "users";
 
service UserService {
  // Returns all user information
  rpc getAllUsers(UserRequest) returns (stream User) {}
 
  // Push to stream of users
  rpc pushToUsers(stream User) returns (Result) {}
}

会員全体にメールまたはプッシュメッセージを送信したいときは、まずすべてのユーザー情報を取得する必要があります。もし会員数があまりにも多い場合、データを一度に送信することは難しいかもしれません。そのような場合は、以下のような方法で大量のユーザー情報を返すストリームサーバーをgRPCで構築することができます。

  1. ProjectReactorのFluxをPublisherとして使用し、リポジトリからストリームでデータを取得します。
  2. データを、gRPCのStreamObservserに変換して外部へ送信します。
// Implement interfaces generated by gRPC
public final class UserServiceImpl extends UserServiceImplBase {
    @Override
    public void getAllUsers(UserRequest request, StreamObserver<User> responseObserver) {
        final Flux<User> userPublisher = userRepo.findAll();
        publisher.subscribe(responseObserver::onNext,
                            responseObserver::onError,
                            responseObserver::onCompleted);
    }
}

StreamObserverにもReactive Streamsと同様にonNext、onError、onCompleted関数が存在するため、それぞれのAPIに委譲するだけで終わります。

今度は受信する側で、すべてのユーザー情報をストリームで受けることを考えてみましょう。ストリームで受けるには、Processorを活用します。Processorは別途のAPIを持っておらず、SubscriberとPublisher、この2つのインターフェースを継承しているだけです。Processorは、データを購読し、購読したデータを再度発行する際に役に立ちます。StreamObserverのonNextで入ってきた新しいユーザー情報をProcessorのonNext関数で送信でき、それを再度購読して必要な作業を追加で行うことができます。

@Override
public StreamObserver<User> pushToUsers(StreamObserver<Result> responseObserver) {
    Processor<User, User> processor = EmitterProcessor.create();
    Publisher<User> publisher = processor;
    Subscriber<User> subscriber = processor;
    // Push one-by-one by subscribing publisher
    ...
      
      return new StreamObserver<User>() {
        // subscribe user data 
       
        @Override public void onNext(User user) { processor.onNext(user); }
        @Override public void onError(Throwable throwable) { processor.onError(throwable); }
        @Override public void onCompleted() {
            responseObserver.onNext(Result.newBuilder().setStatus(200).build());
            responseObserver.onCompleted();
      }
    };
}

作成したgRPCコードをArmeriaで実行してみます。

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.grpc.GrpcService;
 
// Add your grpc service to Armeria GrpcService
var grpcService = GrpcService.builder()
                             .addService(new UserServiceImpl())
                             .build();
 
var server = Server.builder()
                   .http(8080)
                   .serviceUnder("/grpc", grpcService)
                   .serviceUnder("/docs", new DocService())
                   .build();

このようにして作ったストリームサーバーを利用し、小さいメモリーでも大量のデータをストリームで処理することができます。

マイクロサービスのためのArmeriaの機能

ArmeriaによりReactive Streamsを活用すると、大量のデータとトラフィックを柔軟に処理できます。また、RPCでマイクロサービス間の通信も簡単に処理できます。

他にも以下のように、マイクロサービスに必要なさまざまな機能を提供しています。

  • クラウド環境でサーバーの位置を把握するため、KubernetesタイプのDNS(Domain Name System)とZooKeeperを活用したサービス検索(discovery)を提供(参照)しています。
  • 検索されたサービスは、クライアント側のロードバランシングで直接負荷を分散してサーバーと通信します。それにより障害点を減らすことができます。
  • L7ではないクライアントが、直接サーバーの状態をチェックできます。
  • ログを確認する際、分散されたサーバーにアクセスせずに確認できるように、Kafkaでアクセスログを送信します。
  • Micrometerを活用して必要な数値を設定・収集し、PrometheusやNetflixのAtlasのようなアプリケーションモニタリングツールで送信できます。

おわりに

2回に渡ってReactive StreamsとArmeriaについて紹介しました。一緒に見てみたようにArmeriaを利用すると、Reactive Streamsと高性能、非同期、RPC、HTTP/2をサポートするうえ障害に柔軟な、より安全なサーバーを構築できます。

Armeriaでは、記事で触れた機能以外にもさらに多くの機能を提供しています。それに関しては、Armeriaの公式ホームページと以下のリンクをご参照ください。


1. gRPCサーバーをArmeriaで起動する方法ThriftサーバーをArmeriaで起動する方法をご参照ください。

2. Armeriaで標準提供するさまざまなdecoratorは、公式文書と以下のJavadocでご確認できます。