Let’s Play with Reactive Streams on Armeria – Part 2

In the first part of this blog post, we took a look at the basic concepts of Reactive Streams. In part 2 of this blog post, I’d like to tell you about how we use Reactive Streams with Armeria.

What’s Armeria?

Armeria is an open-source asynchronous HTTP/2, RPC, REST client/server library based on Java 8, Netty, Thrift, and gRPC. While Armeria is a lightweight microservices framework, its capabilities is comparable to existing full stack web frameworks.

Let’s first take a look at some prerequisite knowledge for setting up a Reactive Streams server on Armeria.

Supported protocols

Armeria supports both HTTP/1 and HTTP/2, and these protocols both support cleartext and TLS (Transport Layer Security) cryptographic protocols. To ensure compatibility between HTTP/1 and HTTP/2, Armeria supports both HTTP/2’s connection preface and HTTP/1’s upgrade request.

Furthermore, gRPC and Thrift works with both HTTP/1 and HTTP/2 on Armeria. This is a unique feature of Armeria. gRPC normally doesn’t support HTTP/1 and Thrift normally doesn’t support HTTP/2. However, Armeria’s wide array of support allows it to be used in a variety of business environments. In Linux environments, Armeria supports JNI-based (Java Native Interface) socket IOs and BoringSSL-based TLS to allow better performance during production.

Let’s take a closer look at Armeria along with some sample code.

A closer look at Armeria

Armeria is a user-friendly API with simple and easy-to-use code. You only need to write the following five lines if you want to run a “Hello world” server.

// Build your own server under 5 lines.
var server = Server.builder()
       .http(8080)
       .service(“/“, (ctx, req) -> HttpResponse.of(“Hello, World!”))
       .build();
 
server.start();

The ability to easily run servers is a core feature for running separate business components on independent servers in a microservices environment.

Armeria also allows you to simplify server architecture. You don’t need to run sidecar patterns such as Nginx or static web servers such as Apache httpd to use HTTPS or HTTP/2. As mentioned, Armeria supports JNI-based socket IOs and BoringSSL in Linux environments, making it relatively less prone to performance impact. You can also serve static files such as JS, CSS, or images.

var server = Server.builder()
       .http(8080)
       .https(8443) // HTTPS support
       .tlsSelfSigned()
       .service(“/“, (ctx, req) -> HttpResponse.of(“Hello, World!”))
       .build();
server.start();

You no longer need to add network hops with Armeria, allowing you to reduce points of failure, use resources more efficiently, simplify architecture, monitor activity, and dynamically expand your servers.

There’s even more. Armeria comes with built-in annotation that lets you use its features even more easily. For example, you can easily write routing code by setting hello as a prefix, and name as a path variable like the code below (For more information on using annotations, refer to official Armeria documentation).

import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.server.annotation.PathPrefix;
 
// Use built-in annotations for mapping path and parameter injection
@PathPrefix(“/hello”)
class HelloService {
   @Get(“/:name”)
   public String hello(@Param String name) {
       return String.format(“Hello, %s!”, name);
   }
}

Armeria also comes with special features that allow you to serve multiple RPC protocols simultaneously. You can provide REST API, gRPC, and Thrift on a single server, allowing you to flexibly adapt to any business requirements or architecture changes. The single port design enables you to efficiently use resources, minimize unnecessary exposure, and reduce potential maintenance costs.1

var server = Server.builder()
   .http(8080)
   .service(“/hello/rest”, (ctx, req) -> HttpResponse.of(“Hello, world!”))
   .service(“/hello/thrift”, THttpService.of(new ThriftHelloService()))
   .service(“/hello/grpc”, GrpcService.builder()
                                      .addService(new GrpcHelloService())
                                      .build())
   .build();

Enterprise software developers would all agree that they’ve spent countless hours coming up with ways to efficiently process authentication and logging. With Armeria, you can put those worries aside. You can use a separate decorator to manage this separation of concerns.2

If you wish to add an additional decorator, you can always manually implement and bind one to a specific path or service. For example, AuthService in the figure below only calls the service when a request includes authentication info, and returns a “401 unauthorized” error if not.

Next, let’s take a look at how Armeria works with Reactive Streams.

HTTP/2 streams inside Armeria

As the name suggests, “streams” need to maintain a connection and let data constantly flow. If there is only one opening while everywhere else is closed, it will lead to an overload. Armeria uses the back pressure feature of Reactive Streams to dynamically control data flow inside servers. Also, back pressure that utilizes HTTP/2 stream flow control using WINDOW_UPDATE allows you to dynamically connect the Armeria server with your services and repositories from subnetwork layers.

If converting your service’s server to an Armeria Reactive server is difficult at the moment, you could use an Armeria Reactive proxy server by following the sample code and instructions available on the repository.

// Use Armeria’s async & reactive HTTP/2 client.
var client = HttpClient.of(“h2c://backend”);
var server = Server.builder()
    .http(8080)          // Forward all requests reactively
    .service(“prefix:/“, (ctx, req) -> client.execute(req))
    .build();

With an Armeria proxy server at the forefront, you can keep your server safe from any external threats on the internet.

Consolidating Reactive Streams and Armeria

If you create an Armeria server with direct support of Reactive Streams, even more features are available to you. Let’s take a closer look at how you can use Reactive Streams on Armeria, and some details about the built-in publisher.

Armeria HTTP response publisher

Responses on Armeria are comprised of HttpHeaders and HttpData. I will explain how you can convert an RxJava Observable to an Armeria HttpResponse step-by-step.

Once your data is ready, wrap dataStream with HttpData using the Observable map operator.

// 1. Fetch data from Reactive Streams Publisher
Observable<String> dataStream = Observable.just(“a”, “b”, “c”, “d”, “e”);
 
// 2. Convert string to Armeria HttpData
Observable<HttpData> httpDataStream = dataStream.map(HttpData::ofUtf8);

Once you have a confirmed response and a response header, combine it with the httpDataStream prepared above using the concat operator. This will complete a connected stream from HttpHeaders to HttpData.

// 3. Prepare response headers
ResponseHeaders httpHeaders = ResponseHeaders.of(HttpStatus.OK);
 
// 4. Concat http header and body stream
Observable<HttpObject> responseStream = Observable.concat(Observable.just(httpHeaders), httpDataStream);

Convert the completed stream to a Reactive Streams Flowable using the Observable toFlowable function, and wrap it with Armeria’s HttpResponse to complete the conversion.

// 5. Convert Observable to Armeria Response Stream
HttpResponse response = HttpResponse.of(responseStream.toFlowable(BackpressureStrategy.BUFFER));

Armeria’s built-in publisher

The steps above may seem too complicated or boring. That’s why Armeria provides a built-in publisher for JSON Text Sequences(RFC 7464), the standard way of sending stream data on the web, and Server-sent events, the HTML5 standard. This publisher will allow you to send Reactive Streams on the web more easily.

// Fetch data from Reactive Streams Publisher
Publisher<String> dataStream = Flux.just(“a”, “b”, “c”, “d”, “e”);
 
// Convert Publisher to JSON Text Sequences with Armeria HttpResponse
// with “application/json-seq” MIME type
HttpResponse httpResponse = JsonTextSequences.fromPublisher(dataStream);
 
// Convert Publisher to Server-sent Events with Armeria HttpResponse
// with “text/event-stream” MIME type
HttpResponse httpResponse = ServerSentEvents
        .fromPublisher(dataStream, SeverSentEvent::ofData);

RxJava consolidation support is available for those looking to use the built-in publisher even more easily. Add the @ProduceJsonSequence annotation to the annotated service and return Observable for automatic conversion on Armeria to the protocols mentioned above.

import io.reactivex.Observable;
import com.linecorp.armeria.server.annotation.ProducesJsonSequences;
 
class RxJavaService {
    @Get(“/json-streaming”)
      // Generate JSON Text Sequences
    @ProducesJsonSequences
    public Observable<String> json() {
          // Just return RxJava Observable! 
        return Observable.just(“a”, “b”, “c”);
    }
};

Encoding a JSON text string with ‘JsonTextSequences’ will add a separator and line feed to the beginning and end respectively. Depending on the protocol, different operations are required when sending data through HTTP/1 or HTTP/2. Armeria automatically finds the appropriate format according to the currently connected protocol. JSON data is divided inside the Data frame for HTTP/2, and chunked transfer encoding is used for HTTP/1.

Migrating Spring WebFlux

Armeria supports various libraries and frameworks. If you are already using Spring WebFlux to use Reactive Streams, you only need to add armeria-spring-boot-webflux-starter to your dependencies to complete migration to Armeria. You can replace Reactor-Netty, a WebFlux network layer, with Armeria’s Reactive engine with this way.

You may be thinking to yourself “Shouldn’t I just use WebFlux if I only want to replace  the engine?” What advantages are there to going through the process above?

You can add Armeria features to Spring with ArmeriaServerConfigurator if you replace your engine, allowing you to use unique Armeria features that aren’t normally available on Spring. For example, you can add gRPC and Thrift features to a REST API Spring WebFlux server, and serve it through the same single port.

@Configuration
public class ArmeriaConfiguration {
   // Configure the server by providing an ArmeriaServerConfigurator bean.
   @Bean
   public ArmeriaServerConfigurator armeriaServerConfigurator() {
       // Customize the server using the given ServerBuilder. For example:
       return builder -> {
           // Add DocService that enables you to send gRPC and Thrift requests from web browser.
           builder.serviceUnder(“/docs”, new DocService());
           // Log every message which the server receives and responds.
           builder.decorator(LoggingService.newDecorator());
           // Write access log after completing a request.
           builder.accessLogWriter(AccessLogWriter.combined(), false);
           // You can also bind asynchronous RPC services such as Thrift and gRPC:
           builder.service(THttpService.of(…));
           builder.service(GrpcService.builder()…build());
       };
   }
}

The decorator features mentioned above are also available, to make your service more feature-rich.

Support for gRPC streams

gRPC supports streams through StreamObserver, and you can easily convert to gRPC’s StreamObserver with Reactive Streams. Let’s take a look at the example below.

First, decide how to send and receive data using protobuf, and define your interface.

syntax = “proto3”;
package users;
option java_package = “users”;
 
service UserService {
  // Returns all user information
  rpc getAllUsers(UserRequest) returns (stream User) {}
 
  // Push to stream of users
  rpc pushToUsers(stream User) returns (Result) {}
}

In order to send emails and push messages to all users, you must first look up the information of all users. If there are too many users, it may be difficult to send the data at once. In cases like this, you can build a stream server that returns large amounts of user information on gRPC by following the instructions below.

  1. Use ProjectReactor’s Flux as a publisher to look up data on the repository in the format of a stream.
  2. Convert to gRPC StreamObservser and send the data to an external location.
// Implement interfaces generated by gRPC
public final class UserServiceImpl extends UserServiceImplBase {
    @Override
    public void getAllUsers(UserRequest request, StreamObserver<User> responseObserver) {
        final Flux<User> userPublisher = userRepo.findAll();
        publisher.subscribe(responseObserver::onNext,
                            responseObserver::onError,
                            responseObserver::onCompleted);
    }
}

StreamObserver, like Reactive Streams, has onNextonErroronCompleted functions that can be matched and linked to APIs.

This time let’s think how you would receive the data containing all user information. For this demonstration I will be using ProcessorProcessor doesn’t have separate API, and simply inherits Subscriber and Publisher. You can use Processor to easily subscribe to data, and publish the subscribed data easily. You can send new user info that was received through StreamObserver's onNext using Processor's onNext function, and can further perform additional tasks by resubscribing.

@Override
public StreamObserver<User> pushToUsers(StreamObserver<Result> responseObserver) {
    Processor<User, User> processor = EmitterProcessor.create();
    Publisher<User> publisher = processor;
    Subscriber<User> subscriber = processor;
    // Push one-by-one by subscribing publisher
    ...
     
      return new StreamObserver<User>() {
        // subscribe user data 
       
        @Override public void onNext(User user) { processor.onNext(user); }
        @Override public void onError(Throwable throwable) { processor.onError(throwable); }
        @Override public void onCompleted() {
            responseObserver.onNext(Result.newBuilder().setStatus(200).build());
            responseObserver.onCompleted();
        }
    };
}

Let’s run the gRPC code that we wrote on Armeria.

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.grpc.GrpcService;
 
// Add your grpc service to Armeria GrpcService
var grpcService = GrpcService.builder()
                             .addService(new UserServiceImpl())
                             .build();
 
var server = Server.builder()
                   .http(8080)
                   .serviceUnder(“/grpc”, grpcService)
                   .serviceUnder(“/docs”, new DocService())
                   .build();

With a stream server like this, you can stream large amounts of data using only a small amount of memory.

Armeria features for microservices

Using Reactive Streams through Armeria allows you to easily handle large amounts of data and traffic. RPC allows your microservices to communicate with each other more easily as well.

In addition, here are some more features catered towards microservices:

  • Kubernetes-type DNS (Domain Name System) and service discovery with ZooKeeper for easily locating servers in a cloud environment.
  • Client-side load-distribution for searched services during communication with the server. Fewer points of failure.
  • Direct server diagnosis with non-L7 clients.
  • Automatic access logging to Kafka without the need to access distributed servers.
  • Easy transfer of metrics to application monitoring tools such as Netflix’s Atlas and Prometheus through Micrometer’s filters.

Conclusion

Through parts 1 and 2 of this blog post, we took a look at Reactive Streams and Armeria. Armeria allows you to build an asynchronous server that is safe, less prone to failure, and can harness the high performance of Reactive Streams, while also supporting RPC and HTTP/2.

The features mentioned in this blog post aren’t everything there is to Armeria. For more information on what Armeria can do for you, refer to the official website and the following sources.


  1. Refer to Running a gRPC service and Running a Thrift service.
  2. For more information on the various decorators provided with Armeria, refer to the Armeria and Javadoc official documentation.

Related Post