Akka HTTPの仕組みを理解する

初めまして、Ads Platform開発チームの岡田(@ocadaruma)です。

この記事はLINE Advent Calendar 2017の17日目の記事です。

今回、個人的に以前から気になっていたAkka HTTPの内部構造について、この機会に調べましたので紹介いたします。

Akka HTTPとは

Akka HTTPは、Lightbend社によって開発されている、Scala/Java用のHTTP toolkitです。

現在はメンテナンスが終了したsprayの後継と位置付けられており、特徴的なRouting DSLをsprayから受け継いでいます。

また、Play Frameworkは2.6系より、Akka HTTPをデフォルトのバックエンドとして採用しています。

調査のきっかけ

Routing DSLを始めとしたAkka HTTPのAPIは、シンプルかつ高いComposability(組み立て可能性。小さく再利用しやすい構成要素を複数組み合わせて使えること)を提供する一方、その抽象度の高さから、低レイヤーでのHTTPリクエストの処理の仕組みまで含めて理解するには、若干のハードルがあるように感じていました。

このことは、例えば高負荷時におけるパフォーマンスの問題の調査などで問題となりえます。

これが、今回Akka HTTPについて調べることにしたきっかけです。

前提

以下のバージョンのソースコードを対象とします。

また、この記事で着目するのはAkka HTTP Serverのみとし、Client-Sideは対象外とします。

なお、Scala、Akka、およびjava.nioに関する基本的な解説は省略します。

はじめに

この記事では、次のような仕様を持つ小さなサンプルコードを例に、仕組みを追っていくことにします。

  • GETリクエストが来たら、リクエストURIを返す。
  • PUTリクエストが来たら、”PUT”を返す。
package com.example

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

object Main {

  def main(args: Array[String]): Unit = {

    implicit val system: ActorSystem = ActorSystem()
    implicit val materializer: ActorMaterializer = ActorMaterializer()

    //========
    // (1) Routing DSL
    //========
    val route: Route = get {
      extractUri { uri =>
        complete(uri.toString())
      }
    } ~ put {
      complete("PUT")
    }

    //========
    // (2) HTTPサーバー起動
    //========
    Http().bindAndHandle(route, "localhost", 8080)
  }
}

Routing DSL

公式ドキュメント:Directives

サンプルコードの(1)の部分では、Routing DSLを使ってRouteを構成しています。

RouteRequestContext => Future[RouteResult]型エイリアスであり、いわゆる普通のリクエストハンドラー的なシグネチャです。

getextractUri、およびputは、Directive[L]のインスタンスです。

Directive[L]は大雑把にいえば(L => Route) => Routeというシグネチャを持つ関数のようなものです。

例えばextractUriDirective1[Uri]のインスタンスであり、Uri => Routeを受け取ってRouteを返します。

extractUriが行うのは、Uriをリクエストから抽出してinner Uri => Routeに渡すことだけです。

以下はBasicDirectives.scala#L154の記述です。

def textract[L: Tuple](f: RequestContext ⇒ L): Directive[L] =
  Directive { inner ⇒ ctx ⇒ inner(f(ctx))(ctx) }

一方、getおよびputDirective[Unit]のインスタンスで、以下のように条件に応じて異なるDirectiveを返すDirectiveです(ややこしい)。

  • リクエストメソッドが一致した場合は、inner => Routeをそのまま評価するDirective
  • リクエストメソッドが一致しない場合は、「innerを評価せずにRouteResult.Rejectedを返すRoute」を返すDirective

以下はMethodDirectives.scala#L83の記述です。

def method(httpMethod: HttpMethod): Directive0 =
  extractMethod.flatMap[Unit] {
    case `httpMethod` ⇒ pass
    case _            ⇒ reject(MethodRejection(httpMethod))
  } & cancelRejections(classOf[MethodRejection])

そして、(1)のコードではgetputによる2つのRouteを~で結合して、1つのRouteを作っています。

~は、次のような動きを持つRouteのメソッドです(正確にはenrich my libraryパターンで足されたメソッドですが)。

  • レシーバーのRouteを評価した結果がRouteResult.Completeならそれを返す。
  • RouteResult.Rejectedなら引数のRouteを評価した結果を返す。

このように、Directiveをネストさせたり結合したりすることで、リクエストハンドラーを直感的に組み立てられるようになっています。

HTTPサーバー起動の仕組み

次に、HTTPリクエストを受け付けてリクエストハンドラーに渡すサーバー部分について追っていきます。

サンプルコードの(2)の部分は、Http()によって、Akka ExtensionsであるHttpExtを取得し、bindAndHandleでハンドラーを渡してサーバーを起動するコードです。

bindAndHandleの実装を参照すると、次のようにネストしたAkka Streams Graphが存在することがわかります。

akka_http_streams.png
  • Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]]
    • TCPコネクション確立毎にpushするSource
  • Tcp.IncomingConnection#flow: Flow[ByteString, ByteString, NotUsed]
    • TCPコネクションに対してデータの送受信を行うFlow

公式ドキュメント:Working with streaming IO

Tcp.IncomingConnection#flowは生のTCPレイヤーなので、RouteでハンドリングできるようにfullLayer(HTTP/TLSレイヤー)と接続されていますが、今回はfullLayerのことは置いておきます)

これらのGraphがどのように作られているかを見れば、Akka HTTPの仕組みについておおまかに理解できるはずです。

ところで、Akka(HTTP)はjava.nioによるIO多重化を用いており、低レイヤーでやっていること自体は、下記のような典型的なTCPサーバーと同様です。

package com.example

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}

import scala.collection.JavaConverters._

object EchoServer {

  def main(args: Array[String]): Unit = {

    //========
    // ソケットを監視するSelectorをopen
    //========
    val selector = Selector.open()

    //========
    // ポートをListenするChannelをopen
    //========
    val listeningChannel = ServerSocketChannel
      .open()
      .bind(new InetSocketAddress(args.head.toInt))
    listeningChannel.configureBlocking(false)

    listeningChannel
      .register(selector, SelectionKey.OP_ACCEPT)

    try {
      //========
      // イベントループ
      //========
      while (selector.select() > 0) {
        val keys = selector.selectedKeys()

        keys.asScala.foreach { key =>
          if (key.isAcceptable) {
            val connectionChannel = listeningChannel.accept()

            connectionChannel
              .configureBlocking(false)
              .register(key.selector(), SelectionKey.OP_READ)
          } else {
            val channel = key.channel().asInstanceOf[SocketChannel]

            if (key.isReadable) {
              val byteBuffer = ByteBuffer.allocate(4096)
              channel.read(byteBuffer)
              byteBuffer.flip()

              channel.write(byteBuffer)
            }
          }
        }

        keys.clear()
      }
    } finally {
      selector.keys().asScala.foreach(_.channel().close())
    }
  }
}

つまり、Akkaでも次のような処理を行なっている箇所が存在します。

  • ソケットを監視するSelectorのopen
  • Selectorを使って監視を行うイベントループ
    • acceptableな場合に、Listening Channelをacceptする部分
    • Channelがreadable/writableな場合に、データの送受信を行う部分

それがActor based TCP handlingです。

上記を念頭に置きつつ、まずSource[Tcp.IncomingConnection, Future[Tcp.ServerBinding]]から掘り下げていきます。

このSourceは、カスタムGraphStageであるConnectionSourceStageから作られています(参照:Tcp.scala#L111)。

IO(IoTcp)(system)によってTcpManager Actorが生成され、この時点で、ソケットの監視を行うSelectorが以下の流れで作成されます。

  1. TcpManagerの子ActorとしてSelectionHandler Actorが生成される
  2. SelectionHandlerregistryメンバーとして、ChannelRegistryImplが生成される。
  • ここで、Selectorがopenされ、イベントループが開始する。

次に、ConnectionSourceStageGraphStageLogicを見ると、preStart hookでTcpManager ActorへTcp.Bindメッセージを送信しています。

これをトリガーとして、次のようにTCP Listenerが形成されます。

  1. TcpManagerTcp.Bindを受けて、子ActorであるSelectionHandlerWorkerForCommandを送信し、SelectionHandlerはそれを受けて、自身の子ActorとしてTcpListener Actorを生成する。
  2. TcpListener生成時に、Listening ChannelのopenおよびInetSocketAddressへのbindと、channelRegistryへ登録(つまりSelectorへの登録)を行い、TCPコネクションを受け付けることができるようになる。

ここでChannelRegistryImpl#registerに着目すると、SelectionKey#attachmentにActorを格納していることがわかります。

これにより、イベントループ中でActorを取り出し、Actorを介してイベントのハンドリングを行うことができます。

ここまでで、Selectorの生成とイベントループの開始およびコネクションの待ち受けを行う流れが把握できました。

TCPコネクション開始後

Listening Channelがacceptableになると、以下のフローでTcp.IncomingConnectionが生成され、ConnectionSourceStageの出力となります。

  1. ChannelRegistryImplのイベントループでOP_ACCEPTを検出しTcpListener ActorへChannelAcceptableを送信する。
  2. TcpListenerはそれを受けてacceptしたのちにSelectionHandler ActorへWorkerForCommandを送信する。
  3. SelectionHandlerWorkerForCommandを受け、自身の子ActorとしてTcpIncomingConnection Actorを生成する。
  4. TcpIncomingConnectionインスタンス生成時にregistryへの登録を行い、ChannelRegistryImplTcpIncomingConnectionに対してChannelRegistrationメッセージを送り返す。
  5. TcpIncomingConnectionはそれを受けて、ConnectionSourceStageのlogicにConnectedメッセージを送信した後、送受信待ちへ移行する。
  6. ConnectionSourceStageLogicは、IncomingConnectionStage GraphからFlow[ByteString, ByteString]およびTcp.IncomingConnectionを生成し、ConnectionSourceStageの出力としてpushする。

例えば2つのTCPコネクションがある場合、Actor treeは以下のようになります。

system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 4 children
⌊-> IO-TCP RepointableActorRef class akka.io.TcpManager status=0 1 children
    ⌊-> selectors RoutedActorRef class akka.routing.RouterPoolActor status=0 1 children
        ⌊-> $a LocalActorRef class akka.io.SelectionHandler status=0 3 children
            ⌊-> 0 LocalActorRef class akka.io.TcpListener status=0 no children
            ⌊-> 12 LocalActorRef class akka.io.TcpIncomingConnection status=2 no children
            ⌊-> 13 LocalActorRef class akka.io.TcpIncomingConnection status=0 no children

これが、コネクションをacceptし、Source[Tcp.IncomingConnection]の出力を生成するまでのフローです。

イベントループを用いたノンブロッキング処理を、Akka Actorによってうまくハンドリングしている印象です。

まとめ

今回、Akka HTTPをjava.nioのレベルまで掘り下げて処理の流れを一通り追ったことで、トラブルシュートの際に有用な知見が得られたと感じています。

加えて、Play FrameworkもAkka HTTPを採用しているため、Play Frameworkを深く理解する際にも今回調べたことが役立つ気がします。

明日はKagayaさんによる「Redis Lua scriptingをatomicな処理とcache stampede対策に使った話」です。お楽しみに!

参考

Related Post