大家好,我是 Server-side Engineer 的 Allen。想跟大家分享今年在 JCConf 的所見所聞。
觀察這次活動的議程,有許多在推廣 Java 15、以及 Kotlin 的議程也不少,Android 使用 Kotlin 不用說,Kotlin 用在 Backend 的開發也是越來越流行!
The RSocket revolution - Josh Long
這次想介紹的是來自 Pivotal 的 Josh Long 所介紹的 RSocket。相信有使用 Spring Boot 的同學都很熟悉這個名字。RSocket 是一個 TCP based 的 protocol,跟 HTTP 相比,他支援一些長連線 / 雙向溝通的模式,跟 gRPC 一樣很適合用於 Micro services 之間的通訊。更令人心動的是 RSocket 的 R 是 reactive 的 R!而且,在官方的 RSocket Java implementation 中,他跟常用的 reactive programming 框架: Spring Reactor & RxJava (v2 以上) 一樣實作 Java 9 所提出的 Reactive Stream interface (Publisher, Subscriber, Subscription & Processor)。這代表發佈或者消費 RSocket 資料流時能夠使用 Spring Reactor 或者 RxJava 來操作。
以下除了 Josh Long 分享的內容外,還包含作者自己的註解與補充內容,如有錯誤請不吝指教 <(_ _)>
(圖) Josh 於 2020/9/9 在 Denver Java User Group 線上分享:The RSocket Revolution (來源:Denver Java User Group Youtube)
Reactive Programming
Josh 首先介紹 Reactive Programming: Reactive Programming 可視為一種 programming paradigm (程式設計典範?),旨在處理資料流的轉換、聚合等。用 wiki 查到的例子來說明:
a = b + c
在大家熟悉的 imperative programming 中,這個 operation 是一次性的,代表 a 被 assign b+c 的值;但是如果想讓 a 隨著 b 和 c 的值變動而跟著變化該怎麼辦?這時就很適合使用 reactive programming。
在 reactive programming 中,b, c 隨著時間變化可想像成各自的資料流,而 "+" 這個 operation 則會跟著 b, c 的變化而輸出資料流 a,以 RxJava 舉例的話:
sec | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
b | 1 | 1 | 2 | 2 | 3 |
c | 1 | 3 | 5 | 7 | 9 |
a | 2 | 4 | 7 | 9 | 12 |
val a: List<Int> = Flowable.zip(
Flowable.fromIterable(listOf(1, 1, 2, 2, 3)), // b
Flowable.fromIterable(listOf(1, 3, 5, 7, 9)), // c
{ b: Int, c: Int -> b + c }
).toList().blockingGet()
// a is listOf(2, 4, 7, 9, 12)
而使用 Reactive Programming 有甚麼好處呢?Josh 提到以下幾點:
- Resource efficient: 在常見的 reactive programming 框架中,task 與 thread 是分開考慮的,因此可以讓許多 task 共享固定的運算資源 (e.g. thread pool),類似 Coroutine 所帶來的好處
- Secured/robust codes: 由於常見的 reactive programming 框架都套用 functional programming 的設計概念,code 會更好讀
- Back pressure (inspired by TCP/UDP): 當下游的邏輯處理不完,可以讓上游邏輯跟著降低產生 data stream 的速度 (或者其他 back pressure 的策略,例如 sampling)
RSocket with Spring Boot
接下來 Josh 親手使用 Spring Boot 的 RSocket library 示範四種 RSocket 的通訊模式,因為 Josh 當場 coding 的手速太快了,請允許小弟我用 RxJava 重刻一次當作範例 XD
RSocket 支援以下四種通訊模式: request & response, request stream, fire & forget, channel。以下逐一介紹並附上 RxJava 的 code
Request & Response
- 一個 Request 只能得到一個 Response
- 可視為長度只有 1 的 Stream
- 完成後即結束 TCP 連線
RSocket controller
@Controller
class RequestResponseController {
@MessageMapping("helloReqResp")
fun getHelloData(greeting: String): Single<HelloData> {
logger.info { "greeting received: $greeting" }
return Single.just(HelloData(message = "hello!"))
}
companion object : KLogging() {}
}
data class HelloData(val message: String)
Request & Response 模式需在 handler function 定義一個參數,並且回傳一個 Reactive data type (RxJava 使用 Single、Reactor 使用 Mono)
RSocket command line client
啟動 server 後,可使用以下指令測試:
rsc tcp://localhost:7000 -route helloReqResp --request -d hi! --debug
Request Stream
- 有限長度的 stream
- 在 stream 結束前不會斷開 TCP 連線
- 支援 back pressure
- 適合單向傳輸大量資料
RSocket controller
@Controller
class RequestStreamController {
@MessageMapping("requestStream")
fun getStreamData(request: String): Flowable<Msg> {
logger.info("request is $request")
return Flowable
.fromIterable(0..1000)
.map {
logger.info { it }
Msg(message = it.toString())
}
}
companion object : KLogging()
}
data class Msg(val message: String)
Request stream 模式需要在 handler function 給一個參數,回傳 reactive data type (RxJava: Flowable; Reactor: Flux)
RSocket command line client
啟動 server 後,可使用以下指令測試:
rsc tcp://localhost:7000 -route requestStream --stream -d hi! --debug
Fire & forget
- 不會有 response
- 成功送出 request 就斷開 TCP 連線
RSocket controller
@Controller
class FireAndForgetController {
@MessageMapping("fireAndForget")
fun getStreamData(request: String) {
logger.info("request is: $request")
}
companion object : KLogging()
}
Fire-and-forget 模式需要在 handler function 輸入一個參數,但沒有回傳值
RSocket command line client
啟動 server 後,可使用以下指令測試:
rsc tcp://localhost:7000 -route fireAndForget --fnf -d hello
Channel
- 雙向的 (bi-directional) stream
- 在 stream 結束之前不會斷開 TCP 連線。可當作長連線版本的 request & response,差別是可以在同一個連線傳送多個 request /response
- 適合雙向傳輸大量資料
- 支援 back pressure
RSocket controller
@Controller
class ChannelController {
@MessageMapping("channel")
fun getStreamData(requestFlow: Flowable<String>): Flowable<String> {
return requestFlow.map { it.toUpperCase() }
}
}
Channel 模式需要在 handler function 輸入 reactive data type (RxJava: Flowable, Reactor: Flux),並且回傳另一個 reactive data type (RxJava: Flowable, Reactor: Flux)
RSocket command line client
啟動 server 後,可使用以下指令測試:
rsc tcp://localhost:7000 -route channel --channel -d -
Back pressure with RSocket
在聽了 Josh 的演講後,回家便很想試試看 RSocket 是否真的能在 application level 做到 back pressure?
為了驗證,以下使用 request stream 模式做實驗:當 stream consumer 速度較慢時,stream producer 能不能降低 produce 的速度?
先在 RSocket server 上寫一個跟上面 Example 類似的 Request stream controller:
@Controller
class RequestStreamController {
@MessageMapping("requestStream")
fun getStreamData(request: String): Flowable<Foo> {
logger.info("request is $request")
return Flowable
.fromIterable(0..1000)
.map {
logger.info { it }
Foo(message = it.toString())
}
}
companion object : KLogging()
}
data class Foo(val message: String)
接著寫一個很慢的 stream consumer 當作 RSocket client:
@RestController
class RestToStreamController(
private val rsocketRequester: RSocketRequester
) {
@GetMapping("/getStreamCount")
fun getStreamCount(): Single<Int> {
return fluxToFlowable( // use ReactorAdapter to transform Flux(Reactor type) to Flowable(RxJava type)
rsocketRequester
.route("requestStream")
.data("request message")
.retrieveFlux(Foo("dummy").javaClass)
)
.observeOn(Schedulers.io())
.flatMap ({ element ->
logger.info { element.message }
Thread.sleep(100L)
Flowable.just(element)
}, false, 1) // limit concurrency to 1
.toList().map { it.size }
}
companion object : KLogging()
}
class Foo(@JsonProperty("message") val message: String)
這個 REST controller 會對 RSocket server 發出 stream request,接著會開始 consume stream,consume 完成後回傳總共 consume 了多少個 element。
我們限制一次只能 consume 一個 element,且在 consume 每一個 stream 之後都 sleep 100ms, 觀察 RSocket server 的 produce 速度是否跟著降低 (log 印比較慢) ?
實驗結果發現:server produce 的速度確實有跟著降低!所以透過 RSocket protocol,我們可以實現 end-to-end 的 back pressure。
Summary
- RSocket 為 TCP based protocol,適合 server component 之間的通訊
- RSocket Java 實作 Reactive Streams,支援 back pressure
- RSocket 有四種模式: request & response, request stream, fire-and-forget, channel,可依照情境選擇
其他
由於 Java 近幾年來更新頻繁,這回 JCConf 也看到許多推廣新版 Java 的議程,例如以下兩個 Session 都有介紹 Java 15 的新功能:
- Moved by Java: 25 Years of Innovation - Chad Arimura
- New Java Features Released in 2020 - Joseph Kuo
新版 Java (Java 15) 有許多新的特性與優點:
- 隨著 containerize 環境的興起,每次改版,JVM 對於 container 環境的 memory 管理都有改進 (e.g. Java 10 之後,JVM 可以取得 docker container 設定的 CPU/memory limit)
- Java 15 的預設 GC algorithhm 改成 ZGC。ZGC 從 experimental feature 正式成為成為 production feature,號稱 JVM pause time 縮短非常多
- 許多更新潮的語言 feature (如 sealed class, instanceof type matching, etc.)
而會場外還有白板在統計大家的 tech stack,其中 JVM 版本使用 Java 8 佔了 55%、Java 11 有 45%,Joseph Kuo 也建議還在用舊版 Java 的大家可以盡快跟上新版本。
2021 年九月會推出 Java 17,將會是 LTS (Long-term support) 的版本,推薦大家有新專案可以勇敢的嘗試!
對於開發者來說,有增廣見聞的機會比起獨自摸索會更有效率,這次來 JCConf 除了不用請假外還省下門票錢,實在非常感謝敝社 LINE 的贊助!
Reference
- JCConf 2020 - The RSocket revolution - Josh Long
- Josh Long - The RSocket revolution: https://www.youtube.com/watch?v=Kt0LeN3TrkM&ab_channel=DenverJavaUsersGroup
- Premiering - The RSocket revolution at Spring Blog: https://spring.io/blog/2020/08/13/premiering-the-rsocket-revolution
- Java Reactive Streams: http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/
- Example code: https://github.com/allenlee820202/rsocket-demo
- Reactive Programming wiki: https://en.wikipedia.org/wiki/Reactive_programming
- RSocket client cli (RSC): https://github.com/making/rsc
- RSocket using Spring Boot: https://www.baeldung.com/spring-boot-rsocket
- New Java Features released in 2020 by Joseph Kuo: https://www2.slideshare.net/CyberJos/jcconf-2020-new-java-features-released-in-2020?fbclid=IwAR17cdH2kDOeBd7cVhoE31MsnmrYTKeeUwIsTjHAZPoamPT2PBEeCClAn1w
- JVM Memory Handling for Dockers: https://medium.com/@madhupathy/jvm-memory-handling-for-java-based-dockerized-microservices-7568c16f1e65