LINE Corporation 於2023年10月1日成爲 LY Corporation。LY Corporation 的新部落格在這裏。LY Corporation Tech Blog

Blog


JCConf 2020 大會心得分享 - RSocket 革命,為了 Reactive Programming 而生的高效率通訊協定

大家好,我是 Server-side Engineer 的 Allen。想跟大家分享今年在 JCConf 的所見所聞。

觀察這次活動的議程,有許多在推廣 Java 15、以及 Kotlin 的議程也不少,Android 使用 Kotlin 不用說,Kotlin 用在 Backend 的開發也是越來越流行!

The RSocket revolution - Josh Long

這次想介紹的是來自 Pivotal 的 Josh Long 所介紹的 RSocket。相信有使用 Spring Boot 的同學都很熟悉這個名字。RSocket 是一個 TCP based 的 protocol,跟 HTTP 相比,他支援一些長連線 / 雙向溝通的模式,跟 gRPC 一樣很適合用於 Micro services 之間的通訊。更令人心動的是 RSocket 的 R 是 reactive 的 R!而且,在官方的 RSocket Java implementation 中,他跟常用的 reactive programming 框架: Spring Reactor & RxJava (v2 以上) 一樣實作 Java 9 所提出的 Reactive Stream interface (Publisher, Subscriber, Subscription & Processor)。這代表發佈或者消費 RSocket 資料流時能夠使用 Spring Reactor 或者 RxJava 來操作。

以下除了 Josh Long 分享的內容外,還包含作者自己的註解與補充內容,如有錯誤請不吝指教 <(_ _)>

(圖) Josh 於 2020/9/9 在 Denver Java User Group 線上分享:The RSocket Revolution (來源:Denver Java User Group Youtube)

Reactive Programming

Josh 首先介紹 Reactive Programming: Reactive Programming 可視為一種 programming paradigm (程式設計典範?),旨在處理資料流的轉換、聚合等。用 wiki 查到的例子來說明:

a = b + c

在大家熟悉的 imperative programming 中,這個 operation 是一次性的,代表 a 被 assign b+c 的值;但是如果想讓 a 隨著 b 和 c 的值變動而跟著變化該怎麼辦?這時就很適合使用 reactive programming。
在 reactive programming 中,b, c 隨著時間變化可想像成各自的資料流,而 "+" 這個 operation 則會跟著 b, c 的變化而輸出資料流 a,以 RxJava 舉例的話:

sec12345
b11223
c13579
a247912
val a: List<Int> = Flowable.zip(
    Flowable.fromIterable(listOf(1, 1, 2, 2, 3)), // b
    Flowable.fromIterable(listOf(1, 3, 5, 7, 9)), // c
    { b: Int, c: Int -> b + c }
).toList().blockingGet()
 
// a is listOf(2, 4, 7, 9, 12)

而使用 Reactive Programming 有甚麼好處呢?Josh 提到以下幾點:

  • Resource efficient: 在常見的 reactive programming 框架中,task 與 thread 是分開考慮的,因此可以讓許多 task 共享固定的運算資源 (e.g. thread pool),類似 Coroutine 所帶來的好處
  • Secured/robust codes: 由於常見的 reactive programming 框架都套用 functional programming 的設計概念,code 會更好讀
  • Back pressure (inspired by TCP/UDP): 當下游的邏輯處理不完,可以讓上游邏輯跟著降低產生 data stream 的速度 (或者其他 back pressure 的策略,例如 sampling)

RSocket with Spring Boot

接下來 Josh 親手使用 Spring Boot 的 RSocket library 示範四種 RSocket 的通訊模式,因為 Josh 當場 coding 的手速太快了,請允許小弟我用 RxJava 重刻一次當作範例 XD

RSocket 支援以下四種通訊模式: request & response, request stream, fire & forget, channel。以下逐一介紹並附上 RxJava 的 code

Request & Response

  • 一個 Request 只能得到一個 Response
  • 可視為長度只有 1 的 Stream
  • 完成後即結束 TCP 連線

RSocket controller

@Controller
class RequestResponseController {
    @MessageMapping("helloReqResp")
    fun getHelloData(greeting: String): Single<HelloData> {
        logger.info { "greeting received: $greeting" }
        return Single.just(HelloData(message = "hello!"))
    }
 
    companion object : KLogging() {}
}
 
data class HelloData(val message: String)

Request & Response 模式需在 handler function 定義一個參數,並且回傳一個 Reactive data type (RxJava 使用 Single、Reactor 使用 Mono)

RSocket command line client

啟動 server 後,可使用以下指令測試:

rsc tcp://localhost:7000 -route helloReqResp --request -d hi! --debug

Request Stream

  • 有限長度的 stream
  • 在 stream 結束前不會斷開 TCP 連線
  • 支援 back pressure
  • 適合單向傳輸大量資料
RSocket controller
@Controller
class RequestStreamController {
    @MessageMapping("requestStream")
    fun getStreamData(request: String): Flowable<Msg> {
        logger.info("request is $request")
        return Flowable
                .fromIterable(0..1000)
                .map {
                    logger.info { it }
                    Msg(message = it.toString())
                }
    }
    companion object : KLogging()
}
 
data class Msg(val message: String)

Request stream 模式需要在 handler function 給一個參數,回傳 reactive data type (RxJava: Flowable; Reactor: Flux)

RSocket command line client

啟動 server 後,可使用以下指令測試:

rsc tcp://localhost:7000 -route requestStream --stream -d hi! --debug

Fire & forget

  • 不會有 response
  • 成功送出 request 就斷開 TCP 連線
RSocket controller
@Controller
class FireAndForgetController {
    @MessageMapping("fireAndForget")
    fun getStreamData(request: String) {
        logger.info("request is: $request")
    }
 
    companion object : KLogging()
}

Fire-and-forget 模式需要在 handler function 輸入一個參數,但沒有回傳值

RSocket command line client

啟動 server 後,可使用以下指令測試:

rsc tcp://localhost:7000 -route fireAndForget --fnf -d hello

Channel

  • 雙向的 (bi-directional) stream
  • 在 stream 結束之前不會斷開 TCP 連線。可當作長連線版本的 request & response,差別是可以在同一個連線傳送多個 request /response
  • 適合雙向傳輸大量資料
  • 支援 back pressure
RSocket controller
@Controller
class ChannelController {
    @MessageMapping("channel")
    fun getStreamData(requestFlow: Flowable<String>): Flowable<String> {
        return requestFlow.map { it.toUpperCase() }
    }
}

Channel 模式需要在 handler function 輸入 reactive data type (RxJava: Flowable, Reactor: Flux),並且回傳另一個 reactive data type (RxJava: Flowable, Reactor: Flux)

RSocket command line client

啟動 server 後,可使用以下指令測試:

rsc tcp://localhost:7000 -route channel --channel -d -

Back pressure with RSocket

在聽了 Josh 的演講後,回家便很想試試看 RSocket 是否真的能在 application level 做到 back pressure?
為了驗證,以下使用 request stream 模式做實驗:當 stream consumer 速度較慢時,stream producer 能不能降低 produce 的速度?

先在 RSocket server 上寫一個跟上面 Example 類似的 Request stream controller:

@Controller
class RequestStreamController {
    @MessageMapping("requestStream")
    fun getStreamData(request: String): Flowable<Foo> {
        logger.info("request is $request")
        return Flowable
                .fromIterable(0..1000)
                .map {
                    logger.info { it }
                    Foo(message = it.toString())
                }
    }
 
    companion object : KLogging()
}
 
data class Foo(val message: String)

接著寫一個很慢的 stream consumer 當作 RSocket client:

@RestController
class RestToStreamController(
        private val rsocketRequester: RSocketRequester
) {
    @GetMapping("/getStreamCount")
    fun getStreamCount(): Single<Int> {
        return fluxToFlowable( // use ReactorAdapter to transform Flux(Reactor type) to Flowable(RxJava type)
                rsocketRequester
                        .route("requestStream")
                        .data("request message")
                        .retrieveFlux(Foo("dummy").javaClass)
        )
                .observeOn(Schedulers.io())
                .flatMap ({ element ->
                    logger.info { element.message }
                    Thread.sleep(100L)
                    Flowable.just(element)
                }, false, 1) // limit concurrency to 1
                .toList().map { it.size }
 
    }
 
    companion object : KLogging()
}
 
class Foo(@JsonProperty("message") val message: String)

這個 REST controller 會對 RSocket server 發出 stream request,接著會開始 consume stream,consume 完成後回傳總共 consume 了多少個 element。

我們限制一次只能 consume 一個 element,且在 consume 每一個 stream 之後都 sleep 100ms, 觀察 RSocket server 的 produce 速度是否跟著降低 (log 印比較慢) ?

實驗結果發現:server produce 的速度確實有跟著降低!所以透過 RSocket protocol,我們可以實現 end-to-end 的 back pressure。

Summary

  • RSocket 為 TCP based protocol,適合 server component 之間的通訊
  • RSocket Java 實作 Reactive Streams,支援 back pressure
  • RSocket 有四種模式: request & response, request stream, fire-and-forget, channel,可依照情境選擇

其他

由於 Java 近幾年來更新頻繁,這回 JCConf 也看到許多推廣新版 Java 的議程,例如以下兩個 Session 都有介紹 Java 15 的新功能:

  • Moved by Java: 25 Years of Innovation - Chad Arimura
  • New Java Features Released in 2020 - Joseph Kuo

新版 Java (Java 15) 有許多新的特性與優點:

  • 隨著 containerize 環境的興起,每次改版,JVM 對於 container 環境的 memory 管理都有改進 (e.g. Java 10 之後,JVM 可以取得 docker container 設定的 CPU/memory limit)
  • Java 15 的預設 GC algorithhm 改成 ZGC。ZGC 從 experimental feature 正式成為成為 production feature,號稱 JVM pause time 縮短非常多
  • 許多更新潮的語言 feature (如 sealed class, instanceof type matching, etc.)

而會場外還有白板在統計大家的 tech stack,其中 JVM 版本使用 Java 8 佔了 55%、Java 11 有 45%,Joseph Kuo 也建議還在用舊版 Java 的大家可以盡快跟上新版本。

2021 年九月會推出 Java 17,將會是 LTS (Long-term support) 的版本,推薦大家有新專案可以勇敢的嘗試!

<img src="https://engineering.linecorp.com/wp-content/uploads/2020/12/0918_4-1024x768.jpg">
(圖) 會場白板的 Tech Stack 大亂鬥 (來源:JCConf.tw FB)

對於開發者來說,有增廣見聞的機會比起獨自摸索會更有效率,這次來 JCConf 除了不用請假外還省下門票錢,實在非常感謝敝社 LINE 的贊助!

Reference