Adding experimental WebAssembly support to Decaton – Part 1

WebAssembly is a binary instruction format for a stack-based virtual machine, which aims to be a compilation target of many programming languages to enable executing a program at any place like browsers, embedded devices, and servers with high portability, high performance, and sandboxing capability.

Wasm recently caught my interest along with the Rust programming language, and I came up with an idea to integrate it with Decaton, a Kafka consumer framework that we develop and maintain. One of the key benefits of doing it is obvious. While Decaton offers many unique features which makes the development of a high-performance Kafka consumer easy, its application is limited to programming languages that run on a JVM. This is a real concern at LINE since while Java is the language used the most for service development, there are also many other languages adopted for service development such as Go, Js (Node), Python, and so on. If we can support the WebAssembly function as an implementation of DecatonProcessor, users can supply a processor written in any language to run it on Decaton!

Java language binding for Wasmtime

As a Wasm runtime, I selected wasmtime just because it’s one of the representative implementations of WebAssembly runtime. Wasmtime is written in Rust but it offers C API so technically it’s possible to embed it into any language by writing an FFI binding. Out of interest, I started developing its Java language binding – wasmtime-java – as a personal project so I could use it to access Wasmtime API from Decaton, which is written in Java.

Writing JNI implementation in Rust is fairly easy thanks to jni crate. It enables access to JNI APIs through Rust types. Making use of it, I didn’t even need to use Wasmtime C APIs and could use Rust APIs which is much more direct and flexible to use.

Compiling a Rust program into Wasm

Compiling code into Wasm in Rust is surprisingly easy. Here’s a very small example:

$ cat hello-wasm.rs
#[no_mangle]
pub extern "C" fn add(a: i32, b: i32) -> i32 {
    a + b
}
# Compile it for wasm target
$ rustc --crate-type cdylib --target wasm32-unknown-unknown hello-wasm.rs
# Invoke the exported function with the wasmtime command
$ wasmtime hello_wasm.wasm --invoke add 1 2
3

Let’s try another example which is closer to the typical “Hello World.”

$ cat io-wasm.rs
#[no_mangle]
pub extern "C" fn run() {
    println!("Hello Wasm!");
}
$ rustc --crate-type cdylib --target wasm32-unknown-unknown io-wasm.rs
$ wasmtime io_wasm.wasm --invoke run
$

Against our expectations, it doesn’t print anything in my terminal.

Not only because Wasm is a format intended to be portable, but also because it aims to provide a sandboxed execution environment, a compiled Wasm program cannot access the host system calls because they are platform-dependent and highly privileged. This means by default most Wasm programs are restricted from accessing any resources external to the memory that it’s being executed on. Wasm programs cannot access filesystems, networks, host memory, and other hardware, making it nearly useless to run on a server systems which in most cases require I/O with external systems.

WASI – WebAssembly System Interface

WASI has been proposed to address this problem by building a new standardized layer similar to system calls that is universally accessible from many different WebAssembly runtimes.

As far as I’ve checked, there are not many programming languages which support WASI as a target platform at the time of writing this article, but at least Rust and C/C++ are supported.

Rust supports the wasm32-wasi target in addition to wasm32-unknown-unknown, and building a project against the wasm32-wasi target is very easy with the cargo-wasi command. Here’s how to scaffold a project environment for a Rust/Wasm program with WASI.

$ cargo new --lib decaton-wasm-processor
     Created library `decaton-wasm-processor` package
$ cd decaton-wasm-processor
### Edit src/lib.rs ###
$ cat <<'EOF' >> Cargo.toml
[lib]
crate-type = ['cdylib']
EOF
$ cargo wasi build
...
$ file target/wasm32-wasi/debug/decaton_wasm_processor.wasm
target/wasm32-wasi/debug/decaton_wasm_processor.wasm: WebAssembly (wasm) binary module version 0x1 (MVP)

The src/lib.rs is modified to have the following contents.

#[no_mangle]
pub unsafe extern "C" fn run() {
    println!("Hello through WASI!");
}

Once built, the resulting Wasm module is written under ./target/wasm32-wasi/debug. We can inspect its contents using wasm2wat, which is part of WABT (the WebAssembly Binary Toolkit). Notice that the resulting binary is tremendously large because it needs to contain the entire code including those from dependencies to make it truly “portable.”

$ wasm2wat target/wasm32-wasi/debug/decaton_wasm_processor.wasm
(module
...
  (func $run (type 0)
    (local i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32 i32)
    global.get 0
    local.set 0
    i32.const 32
    local.set 1
    local.get 0

You can test the generated Wasm binary with the wasmtime command as well as the previous example.

$ wasmtime target/wasm32-wasi/debug/decaton_wasm_processor.wasm --invoke run
Hello through WASI!

Add WebAssembly processor to Decaton

Now it’s time to implement WebAssembly processor support on Decaton.

Use linear memory for handing over complex data

Before starting the implementation of WebAssembly support, we have to think about one important thing – how to pass task data from Decaton to Wasm processor, namely interoperability.

To let Wasm function to process a consuming task, we need to pass its data to the Wasm function. This is currently not so straightforward as Wasm only supports a limited number of primitive types as a function parameter. Not only are language-dependent types like struct or class not supported, other collection types like bytes array cannot be passed to or taken from Wasm functions directly either. So to pass task data, which is a byte array at the most lower-level representation, we need to do it in a trickier way.

The idea is to pass/take a pointer-like numerical value to/from Wasm function and write/read the target data in WebAssembly guest’s memory by directly addressing from the host (Decaton) side. Enable this by accessing one “export” from the Wasm module which has the type memory and is named conventionally “memory.”

NOTE: Interoperability is much easier when generating wasm code that runs on a browser by using wasm-bindgen.

We can inspect a generated Wasm binary to have a memory export as follows:

$ wasm2wat target/wasm32-wasi/debug/decaton_wasm_processor.wasm | grep '(memory'
  (memory (;0;) 17)
  (export "memory" (memory 0))

To access an exported memory, we can use Memory#buffer() to obtain the ByteBuffer representing the target memory area.

try (Memory memory = linker.getOneByName("", "memory").memory()) {
    // You can access entire Wasm memory through java.nio.ByteBuffer interface.
    ByteBuffer memBuf = memory.buffer();
    memBuf.putInt(0x1234, 7);
}

Implement WasmtonProcessor

I added a new module wasmton with a new DecatonProcessor implementation as follows.

public class WasmtonProcessor implements DecatonProcessor<byte[]> {
    private final Store store;
    private final Linker linker;
    private final Wasi wasi;
    private final Memory mem;
    private final Func pollTaskFunc;
    private final Func runFunc;
    private final AtomicReference<byte[]> currentTask;

    public WasmtonProcessor(String wasmPath) {
        store = new Store();
        linker = new Linker(store);
        WasiConfig config = new WasiConfig(new String[0],
                                           // Mount current dir as root for wasm processor
                                           new PreopenDir[] { new PreopenDir("./", "/") });
        wasi = new Wasi(store, config);
        wasi.addToLinker(linker);
        currentTask = new AtomicReference<>();
        AtomicReference<Memory> memRef = new AtomicReference<>();

        // poll_task extern function implementation
        pollTaskFunc = WasmFunctions.wrap(store, I64, I32, I32, (bufAddr, bufLen) -> {
            byte[] task = currentTask.getAndSet(null);
            if (task == null || bufLen < task.length) {
                return -1;
            }
            ByteBuffer buf = memRef.get().buffer();
            buf.position(bufAddr.intValue()); // Seek to the task buffer's location
            buf.put(task);
            return task.length;
        });
        linker.define("decaton", "poll_task", Extern.fromFunc(pollTaskFunc));
        try (Engine engine = store.engine();
             Module module = Module.fromFile(engine, wasmPath)) {
            linker.module("", module);
        }
        mem = linker.getOneByName("", "memory").memory();
        memRef.set(mem);
        runFunc = linker.getOneByName("", "run").func();
    }

    @Override
    public void process(ProcessingContext<byte[]> context, byte[] task) throws InterruptedException {
        currentTask.set(task);
        WasmFunctions.consumer(runFunc).accept();
    }

    @Override
    public void close() {
        runFunc.close();
        pollTaskFunc.close();
        mem.close();
        wasi.close();
        linker.close();
        store.close();
    }
}

Besides the linear memory usage, the majority of other parts are boilerplates to run a Wasm/WASI program. However, this processor also implements the protocol to use between Decaton and Wasm processor as follows:

  1. The compiled processor module exports memory and the function run which has no parameter and return value.
  2. wasmton provides the extern function named poll_task under the namespace “decaton” which can be imported by the Wasm processor.
  3. When a new task arrives, WasmtonProcessor calls run.
  4. When run is called, the Wasm processor calls poll_task with arguments pointing the buffer to store task data and its capacity.
  5. poll_task stores task data to the buffer at the given pointer and it returns the length of the task data.
  6. Wasm processor reads the task data from the buffer, processes it, and returns from run.

The image below summarizes this protocol.

Decaton is a library and it does not provide any entrypoint to run it as an executable binary. However, to let non-Java developers run logic on Decaton it’s better to provide a binary executable to start the Decaton processor with the given Wasm processor implementation, without needing to write any Java code. I added the main class as follows.

public class Wasmton {
    public static void main(String[] args) throws InterruptedException {
        if (args.length < 3) {
            throw new RuntimeException("Usage: Wasmton BOOTSTRAP_SERVERS TOPIC PATH_TO_PROCESSOR_WASM");
        }
        String bootstrapServers = args[0];
        String topic = args[1];
        String wasmPath = args[2];

        Properties consumerProps = new Properties();
        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "wasmton-test");
        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "wasmton-test");
        TaskExtractor<byte[]> extractor = bytes ->
                new DecatonTask<>(TaskMetadata.builder().build(), bytes, bytes);
        ProcessorSubscription subscription = SubscriptionBuilder
                .newBuilder("wasmton-test")
                .consumerConfig(consumerProps)
                .processorsBuilder(
                        ProcessorsBuilder.consuming(topic, extractor)
                                         .thenProcess(() -> new WasmtonProcessor(wasmPath),
                                                      ProcessorScope.THREAD))
                .buildAndStart();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                subscription.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }));
    }
}

Implement a Decaton processor with Wasm/WASI

Next, we try to implement the processor side.

The first processor example does very simple work: upon receiving a task which contains a message, it appends the message to the file ./messages. In the decaton-wasm-processor project I’ve modified lib.rs as follows:

const TASK_BUF_SIZE: usize = 1024 * 1024;

#[derive(Serialize, Deserialize, Debug)]
struct Task {
    message: String,
}

#[link(wasm_import_module = "decaton")]
extern "C" {
    fn poll_task(buf_addr: i64, buf_len: i32) -> i32;
}

#[no_mangle]
pub unsafe extern "C" fn _initialize() {
    let mut fd = 3;
    while let Ok(stat) = wasi::fd_prestat_get(fd) {
        match stat.tag {
            wasi::PREOPENTYPE_DIR => {
                let prefix_len = stat.u.dir.pr_name_len;
                let mut prefix = vec![0; prefix_len + 1];
                wasi::fd_prestat_dir_name(fd, prefix.as_mut_ptr(), prefix_len).unwrap();
                prefix[prefix_len] = '\0' as u8;
                libc::__wasilibc_register_preopened_fd(
                    fd as i32,
                    Box::into_raw(prefix.into_boxed_slice()) as *const i8,
                );
            }
            _ => break,
        }
        fd += 1;
    }
}

#[no_mangle]
pub unsafe extern "C" fn run() {
    let mut task_buf = [0u8; TASK_BUF_SIZE];
    let task_len = poll_task(task_buf.as_mut_ptr() as i64, task_buf.len() as i32);
    let task: Task = serde_json::from_slice(&task_buf[..task_len as usize]).unwrap();
    let mut file = OpenOptions::new()
        .append(true)
        .create(true)
        .open("/messages")
        .unwrap();
    writeln!(file, "{}", task.message).unwrap();
}

The poll_task function is declared as an externally defined function, which is provided under the “decaton” namespace by contract. The #[link(wasm_import_module = "decaton")] attribute tells the Rust compiler to treat it as an import function in the generated Wasm module.

The first exported function is _initialize. This is a special function, which is called once at the time this Wasm module is loaded into a runtime. WASI proposes a concept called command and reactor which corresponds to executable binary and a dynamically loadable library in the world we’re familiar with. The weird thing that is implemented in _initialize is actually supposed to be done automatically by the constructor implemented in wasi-libc, but apparently, it’s respected only when generating command (executable binary) in the current Rust compiler implementation, so I had to do the same work on my own to make it work for a reactor type module.

The run function is the main part of the processor logic, which follows the contract decided by Decaton. When it’s called, it obtains the new task by calling poll_task, opens the messages file, and appends the message in the task to the file.

Let’s see how it works.

@ ~/decaton-wasm-processor (master)
$ cargo wasi build
...

@ ~/decaton/wasmton (wasmton)
$ ../gradlew --no-daemon run --args "$BOOTSTRAP_SERVERS $TOPIC /path/to/decaton_wasm_processor.wasm"
...

# Produce some tasks to Kafka topic
$ ./bin/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --topic $TOPIC <<'EOS'
{"message":"Hello"}
{"message":"from"}
{"message":"Wasm!"}
EOS
...

@ ~/decaton/wasmton (wasmton)
$ cat messages
Hello
from
Wasm!

It worked exactly as I expected! We now have the first version of a Wasm-based Decaton processor working!

Processor with network I/O

Now that everything’s set up, it’s time to try something more practical and useful! According to my anecdotal experience working at LINE, 100% of Decaton processors implement some sort of network I/O to access external DBs, or to call web APIs, and so on.

So let’s try HTTP access from the processor, which will prove WebAssembly-based processor support’s capability to be applied to real-world Decaton processors.

I modified the processor to take the path from the task, and call HTTP GET for localhost with that path. To keep things simple I used a very low-level TCP interface rather than using an existing HTTP client.

#[derive(Serialize, Deserialize, Debug)]
struct Task {
    path: String,
}

#[no_mangle]
pub unsafe extern "C" fn run() {
    let mut task_buf = [0u8; TASK_BUF_SIZE];
    let task_len = poll_task(task_buf.as_mut_ptr() as i64, task_buf.len() as i32);
    let task: Task = serde_json::from_slice(&task_buf[..task_len as usize]).unwrap();
    let mut stream = TcpStream::connect("127.0.0.1:8080").unwrap();
    write!(stream, "GET {} HTTP/1.0\r\n\r\n", task.path).unwrap();

    io::copy(&mut stream, &mut io::stdout()).unwrap();
}
# Start a simple HTTP server to test it
@ ~/decaton (wasmton)
$ python -m SimpleHTTPServer 8080
...

@ ~/decaton/wasmton (wasmton)
$ ../gradlew --no-daemon run --args "$BOOTSTRAP_SERVERS $TOPIC /path/to/decaton_wasm_processor.wasm"
...

# Produce some tasks to Kafka topic
$ ./bin/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --topic $TOPIC <<'EOS'
{"path":"/README.md"}
EOS

Then I got the output below:

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "operation not supported on wasm yet" }', src/lib.rs:22:22
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

“operation not supported on wasm yet.” At the time of writing this article, WASI doesn’t provide sufficient interfaces to use sockets, and Rust’s stdlib raises the error above when we try using it. Consequently, WASI is nearly useless for the purpose of providing a language-agnostic Decaton processor interface for now.

It would’ve been easy to conclude this story by saying “WASI isn’t ready for this, I’m looking forward to seeing its future!” but just that wouldn’t be fun.

In the 2nd part of this article, we will look into how we can possibly extend WASI APIs to add support for sockets, making the above example work!