As of October 1, 2023, LINE has been rebranded as LY Corporation. Visit the new blog of LY Corporation here: LY Corporation Tech Blog

Blog


Untangle the threads between RxSwift and Combine

Question: What kind of bug can only be easily reproduced on a device as old as an iPhone 7 or older?

Hi there. I'm Zhihao Zang, an iOS engineer on the LINE B2B App Dev team. A big part of what we do involves developing and maintaining the LINE Official Account app. Today, I'd like to share our experience (including some of the hurdles we faced) in migrating the LINE Official Account app from using RxSwift to using Combine.

Background: RxSwift and Combine

We use RxSwift, a Reactive Programming framework, for the LINE Official Account app. RxSwift has a solid reputation and a pretty long history: the earliest release on GitHub is version 1.2, which was released on May 10, 2015. However, at WWDC 2019, Apple introduced its own "declarative framework for processing values over time": Combine. Even though Apple didn't specifically label Combine as a "Reactive Programming framework", it's widely considered a first-party solution for "a declarative programming paradigm concerned with data streams and the propagation of change", or in other words, Reactive Programming.

Besides having a similar ideology, Combine also features an API, a large part of which appears to be direct replacements for the corresponding RxSwift counterparts.

As a result, there are loads of guides online that explain how to switch your RxSwift code to Combine. For instance, you can easily find an RxSwift to Combine Cheatsheet. You can even find "RxSwift to Combine: The Complete Transition Guide" with a quick search. Don't get me wrong, these resources are great to get you started. But, if you follow them without thinking, you might end up in a pitfall.

A collection of issues

A basic example

Here's a common pattern in reactive programming:

RxSwift sample 1
let disposeBag = DisposeBag()

observable
    .subscribe(onNext: { element in
        // Do something with the element
    })
    .disposed(by: disposeBag)

In this instance, we subscribe to a sequence (the observable), and respond to the elements it emits. Lastly, the entire subscription is kept in the disposeBag, meaning the subscription's life cycle is tied to the disposeBag.

The Combine equivalent of this logic is pretty straightforward. Just refer to the RxSwift to Combine Cheatsheet mentioned earlier, and you'll see that this is all you need to do:

Combine sample 1
var cancellables = Set<AnyCancellable>() // can also be other types of Collection

publisher
    .sink(receiveValue: { value in
        // Do something with the value
    })
    .store(in: &cancellables)

Similarly, a Publisher can also be viewed as an async sequence, or "a sequence of values over time". We subscribe to it with the sink, and respond to the elements it emits. Lastly, the entire subscription is kept in the cancellables, meaning the subscription's life cycle is tied to the cancellables.

Just find and replace. It's that easy. Right?

Unfortunately, we can easily encounter issues with the Combine version. The problem is that DisposeBags are thread-safe, while Swift Collections aren't. This means you can safely access a DisposeBag from different threads and store subscriptions in it, or just dispose them all. But with a Collection of AnyCancellables, if you were to modify it concurrently, and if you're unlucky, you can run into race conditions.

Let's make a thread-safe Collection!

Since the issue seems to be that Swift Collections aren't thread-safe, we can just make one that is! A quick online search seems like a good first step: Let's search "Swift Thread Safe Collection".

A Google search of "Swift Thread Safe Collection"

There seem to be many existing solutions that do exactly what we need! If you take a look at these results above, you'll see that all of the results above the fold mention one pattern: a reader/writer pattern implemented with a concurrent DispatchQueue. The basic idea is as follows:

Reader/writer pattern
let queue = DispatchQueue(label: "array-access-queue", attributes: .concurrent)
queue.async(flags: .barrier) {
  // Write here
}
queue.sync {
  // Read here
}

Just a heads up: Although DispatchQueue isn't part of the Swift standard library API (it's part of the Dispatch Framework, also known as Grand Central Dispatch or GCD), I've found that specifying "Swift" when searching for iOS development-related topics often yields better results than using "iOS" (which usually brings up end-user-facing results) or a specific library name (which can be too narrow).

The reader/writer pattern is super useful. It ensures thread safety by making writes serial, and boosts performance by allowing concurrent reads.

If we want to use this pattern with Combine to manage subscriptions, the documentation suggests we need to make our thread-safe collection comply with RangeReplaceableCollection. This, in turn, complies with Collection. I believe the best way to create a custom class that conforms to a complex protocol is to build it step by step, from the ground up. So let's start by adding all the necessary components:

Reader/writer implementation 1
/// A thread-safe array that uses a queue to ensure that its read and write operations are atomic.
final class ThreadSafeArray<Element: Hashable> {
    private let queue = DispatchQueue(label: "thread-safe-array-queue", attributes: .concurrent)
    var array: [Element]

    init() {
        array = []
    }
}

Next, we'll make it conform to Collection. Since we're building this class with an Array, which already complies with Collection, this step mainly involves wrapping the corresponding API calls with the reader/writer pattern.

Reader/writer implementation 2
extension ThreadSafeArray: Collection {
    var startIndex: Array<Element>.Index {
        queue.sync {
            array.startIndex
        }
    }

    var endIndex: Array<Element>.Index {
        queue.sync {
            array.endIndex
        }
    }

    func index(after i: Array<Element>.Index) -> Array<Element>.Index {
        queue.sync {
            array.index(after: i)
        }
    }
}

Finally, we'll make it comply with RangeReplaceableCollection:

Reader/writer implementation 3
extension ThreadSafeArray: RangeReplaceableCollection {
    func replaceSubrange<C>(_ subrange: Range<Int>, with newElements: C) where C: Collection, Element == C.Element {
        queue.async(flags: .barrier) { [weak self] in
            self?.array.replaceSubrange(subrange, with: newElements)
        }
    }

    subscript(index: Array<Element>.Index) -> Array<Element>.Element {
        queue.sync {
            array[index]
        }
    }

    public func removeAll() {
        queue.async(flags: .barrier) { [weak self] in
            self?.array.removeAll()
        }
    }
}

And now, we can use our new thread-safe array to store the subscription.

Combine sample 2
var cancellables = ThreadSafeArray<AnyCancellable>() // still need to be mutable to be passed as `inout`

publisher
    .sink(receiveValue: { value in
        // Do something with the value
    })
    .store(in: &cancellables)

Looks like we've solved our problem! But, we're only halfway through our journey. Let's see what could potentially go wrong with this popular implementation.

Reader/writer lock and thread explosion

During our tests, we noticed a peculiar behavior: the implementation runs smoothly on newer devices but freezes on older ones, like the iPhone 7. Despite the task being reasonably lightweight, and the UI remaining responsive during the freeze.

To keep this article timely, I ran my query through ChatGPT 4 with the temperature set to 0.5:

Q: The following code deadlocks. Can you tell me why and how to resolve it? (followed by the ThreadSafeArray we created above)

A: The deadlock occurs due to the usage of queue.sync inside the subscript method. If you call this method from within a block of code that is already executing on the queue (for example, from within the replaceSubrange(_:with:) method or the removeAll() method), it will cause a deadlock because queue.sync will wait for the currently executing block to finish before it starts, but the currently executing block is waiting for queue.sync to finish. This situation is known as a deadlock.

Here’s how to solve it:

You can use a read and write pattern with the concurrent queue you have. This allows multiple reads to occur simultaneously but ensures that writes have exclusive access to the resource.

Spoiler alert: this isn't the solution. But consider it a hint of what's to come.

As it turns out, this implementation of the reader/writer pattern, which uses a concurrent DispatchQueue, can easily fall apart in one scenario: thread explosion.

According to Apple's documentation,

Dispatch queues execute tasks either serially or concurrently. Work submitted to dispatch queues executes on a pool of threads managed by the system.

But this pool has a limit. As stated in the same documentation:

When a task scheduled by a concurrent dispatch queue blocks a thread, the system creates additional threads to run other queued concurrent tasks. If too many tasks block, the system may run out of threads for your app.

In our code, both queue.sync and queue.async(flags: .barrier) cause waits. According to its documentation:

When you add a barrier to a concurrent dispatch queue, the queue delays the execution of the barrier block (and any tasks submitted after the barrier) until all previously submitted tasks finish executing. After the previous tasks finish executing, the queue executes the barrier block by itself. Once the barrier block finishes, the queue resumes its normal execution behavior.

So, what happens if the thread pool is exhausted? Everything waits for everything else to finish, creating a deadlock.

This also explains why this issue mostly occurs on older devices: while the thread pool limit exists, it's never explicitly documented. It's widely believed that global queues can create up to 64 threads simultaneously. You might have seen this type of crash report on macOS: "Dispatch Thread Soft Limit Reached: 64".

Tweet from user @tualatrix (Chinese)

The main reasons for this are:

  1. The kernel has a MAX_PTHREAD_SIZE of 64 KB.
  2. Every thread costs "approximately 1 KB" in kernel data structures.

Additionally, in a 2021 WWDC session, it was stated:

If our news application has a hundred feed updates that need to be processed, this means that we have overcommitted the iPhone with 16 times more threads than cores.

Perhaps we can interpret "16 times more threads than cores" as "too many". The iPhone 7 has the Apple A10 Fusion SoC, which features 2 performance cores and 2 efficiency cores. However, only one type of core can be active at a time. So effectively, the iPhone 7 has 2 cores, allowing for 34 threads. This isn't a high thread limit for a modern app. Its successor, the Apple A11 Bionic, has 2 performance cores and 4 efficiency cores, all of which can run simultaneously, making it a 6-core SoC with a much higher thread limit.

Solution: The key is a lock

To solve this problem, we could properly lock the ThreadSafeArray. After all, using a DispatchQueue in this case is just a roundabout way to get a lock.

There are many types of locks available: NSLock, OSAllocatedUnfairLock, NSRecursiveLock, and so on. If we want to target iOS 15 or even older versions, OSAllocatedUnfairLock isn't an option. NSLock might be a popular choice, but it's not suitable here because it isn't reentrant-safe. That means if we try to acquire the lock more than once on the same thread (like if one of our ThreadSafeArray methods calls another one that's also protected by the lock), it will deadlock. Since an NSRecursiveLock "can be acquired multiple times by the same thread without causing a deadlock", it seems like a better choice, just like ChatGPT suggested.

As we're moving from RxSwift to Combine, we can look at the implementation of RxSwift's DisposeBag for reference.

RxSwift DisposeBag implementation
public final class DisposeBag: DisposeBase {
    private var lock = SpinLock()
    // ...
    private func _insert(_ disposable: Disposable) -> Disposable? {
        self.lock.performLocked {
            // ...
        }
    }
    // ...
}
We can see that it's using a SpinLock. This is a type alias of the RecursiveLock, which is ultimately just a type alias of the NSRecursiveLock in production code.

RxSwift lock implementation
/// Lock.swift

typealias SpinLock = RecursiveLock

extension RecursiveLock: Lock {
    @inline(__always)
    final func performLocked<T>(_ action: () -> T) -> T {
        self.lock(); defer { self.unlock() }
        return action()
    }
}


/// RecursiveLock.swift

typealias RecursiveLock = NSRecursiveLock

What a happy coincidence! If we swap out the concurrent DispatchQueue in our ThreadSafeArray with an NSRecursiveLock, it'd look something like this: Let's start with the ingredients.

NSRecursiveLock implementation 1
/// A thread-safe array that uses an `NSRecursiveLock` to ensure that its read and write operations are atomic.
final class ThreadSafeArray<Element: Hashable> {
    private var lock = NSRecursiveLock()

    var array: [Element]

    init() {
        array = []
    }
}

Then, we'll make it conform to the Collection.

NSRecursiveLock implementation 2
extension ThreadSafeArray: Collection {
    var startIndex: Array<Element>.Index {
        lock.performLocked {
            array.startIndex
        }
    }

    var endIndex: Array<Element>.Index {
        lock.performLocked {
            array.endIndex
        }
    }

    func index(after i: Array<Element>.Index) -> Array<Element>.Index {
        lock.performLocked {
            array.index(after: i)
        }
    }
}

And lastly, RangeReplaceableCollection.

NSRecursiveLock implementation 3
extension ThreadSafeArray: RangeReplaceableCollection {
    func replaceSubrange<C>(_ subrange: Range<Int>, with newElements: C) where C: Collection, Element == C.Element {
        lock.performLocked {
            array.replaceSubrange(subrange, with: newElements)
        }
    }

    subscript(index: Array<Element>.Index) -> Array<Element>.Element {
        lock.performLocked {
            array[index]
        }
    }

    public func removeAll() {
        lock.performLocked {
            array.removeAll()
        }
    }
}

Not done yet!

We're not going to leave this new class untested, are we? There are many aspects of this class that need testing: appending, removing, seeking... We can also run some more comprehensive tests. For instance, we could try creating numerous concurrent reads and writes to it:

Tests
let iterations = 1_000
func testStoringInManualDispatch() async throws {
    var cancellables = ThreadSafeArray<AnyCancellable>()
    var array = ThreadSafeArray<Int>()
    func store(index: Int) {
        Just(index).eraseToAnyPublisher()
            .sink { array.append($0) }
            .store(in: &cancellables)
    }
    DispatchQueue.concurrentPerform(iterations: iterations) { index in
        DispatchQueue.global().async {
            store(index: index)
        }
    }
    try await Task.sleep(nanoseconds: NSEC_PER_SEC)
    XCTAssertEqual(array.count, iterations)
    XCTAssertEqual(array.sorted(), Array(0...(iterations - 1)))
}

This test will crash quickly with Swift's Array or Set. It'll also hang due to deadlock with the ThreadSafeArray we implemented using a reader/writer pattern.

Takeaways

I want to wrap this up with some key points. They might seem obvious, but I believe they're worth repeating.

Firstly, I'd pay close attention to multi-thread-related issues when switching code from RxSwift to Combine. In many instances, RxSwift and Combine may seem similar, but they can also differ in unexpected ways. There's no one-size-fits-all solution, so we need to try and understand the workings of each and test, test, test. We can always delve into the source code of RxSwift, but Combine isn't and probably will never be open-sourced. That said, I sometimes find the OpenCombine project useful.

Secondly, I wouldn't blindly trust any random code snippets I find online, including all the code above. A RxSwift to Combine Cheatsheet might be a good starting point for your migration, but it can also gloss over a lot of subtleties. A reader/writer lock is a useful pattern, but it might not suit every need. Once again, the best approach is to try and understand the mechanism, and test, test, test.