DEV Community

Cover image for Leak investigation: Rx disposal race in SQLDelight
Py ⚔
Py ⚔

Posted on

Leak investigation: Rx disposal race in SQLDelight

Header image: The In-Between by Romain Guy.

In this blog we'll look into how an easy mistake when using Observable.create() can lead to subtle leaks.

I recently investigated the following leak, which I couldn't reproduce systematically:

┬───
...
├─ com.example.hockey.PlayerQueries$selectAllQuery instance
│    ↓ Query.listeners
│            ~~~~~~~~~
├─ java.util.concurrent.CopyOnWriteArrayList instance
│    ↓ CopyOnWriteArrayList.array
│                           ~~~~~
├─ java.lang.Object[] array
│    ↓ Object[].[0]
│               ~~~
├─ sqldelight.runtime.rx.QueryListenerAndDisposable instance
│    Retaining 4.3 kB in 56 objects
│    ↓ QueryListenerAndDisposable.emitter
│                                 ~~~~~~~
... RxJava observer chain
├─ com.example.hockey.PlayersView$onAttachedToWindow$1 instance
│    Anonymous class implementing io.reactivex.functions.Function
│    ↓ PlayersView$onAttachedToWindow$1.this$0
│                                       ~~~~~~
╰→ com.example.hockey.view.PlayersView instance
     Leaking: YES (View.mContext references a destroyed activity)
Enter fullscreen mode Exit fullscreen mode

In the above leaktrace, PlayerQueries$selectAllQuery is a generated SQLDelight query. Our PlayersView is listening for updates to that query while the view is attached by leveraging Query.asObservable(). Once the view is detached, the observable chain is disposed and the query is expected to let go of the corresponding listener.

I inspected the heap dump and found that the view was indeed detached, the observable chain was correctly disposed, and yet the QueryListenerAndDisposable listener had not been removed from the query. Let's look at the Query.asObservable() implementation:

fun <T : Any> Query<T>.asObservable(): Observable<Query<T>> {
  return Observable.create(QueryOnSubscribe(this))
}

private class QueryOnSubscribe<T : Any>(
  private val query: Query<T>
) : ObservableOnSubscribe<Query<T>> {
  override fun subscribe(emitter: ObservableEmitter<Query<T>>) {
    val listener = QueryListenerAndDisposable(emitter, query)
    emitter.setDisposable(listener)
    query.addListener(listener)
    emitter.onNext(query)
  }
}

private class QueryListenerAndDisposable<T : Any>(
  private val emitter: ObservableEmitter<Query<T>>,
  private val query: Query<T>
) : AtomicBoolean(), Query.Listener, Disposable {
  override fun queryResultsChanged() {
    emitter.onNext(query)
  }

  override fun isDisposed() = get()

  override fun dispose() {
    if (compareAndSet(false, true)) {
      query.removeListener(this)
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Let's now look at the implementation example from Observable.create():

val observable = Observable.create<Event> { emitter ->
  val closeable = api.someMethod { event ->
    emitter.onNext(event)
  }
  emitter.setCancellable(closeable::close)
}
Enter fullscreen mode Exit fullscreen mode

We can rewrite the above example code to use a listener instead of a closeable:

val observable = Observable.create<Event> { emitter ->
  // 1. Create a listener
  val listener: (Event) -> Unit = { event ->
    emitter.onNext(event)
  }
  // 2. Set the listener
  api.addListener(listener)
  // 3. Remove the listener on dispose
  emitter.setCancellable {
    api.removeListener(listener)
  }
}
Enter fullscreen mode Exit fullscreen mode

The above implementation is fairly close to the SQLDelight implementation, with one major difference: SQLDelight sets the disposable before adding the listener to the query, i.e. something like this:

val observable = Observable.create<Event> { emitter ->
  // 1. Create a listener
  val listener: (Event) -> Unit = { event ->
    emitter.onNext(event)
  }
  // 2. Remove the listener on dispose
  emitter.setCancellable {
    api.removeListener(listener)
  }
  // 3. Set the listener
  api.addListener(listener)
}
Enter fullscreen mode Exit fullscreen mode

It turns out that's a mistake! If emitter is already disposed when the subscription runs, then we'll add the listener but never remove it:

val observable = Observable.create<Event> { emitter ->
  // 1. Create a listener
  val listener: (Event) -> Unit = { event ->
    emitter.onNext(event)
  }
  // 2. if emitter is currently already disposed,
  // the cancellable callback fires immediately and
  // there's no listener to remove yet.
  emitter.setCancellable {
    api.removeListener(listener)
  }
  // 3. Set the listener, which will never be removed. Leak!
  api.addListener(listener)
}
Enter fullscreen mode Exit fullscreen mode

Can emitter be already disposed when the subscription runs? Yes! This can happen if the subscription runs on a separate thread from the thread that called subscribe():

val subscription = observable.subscribeOn(Schedulers.io())
  .subscribe()

// `dispose()` might execute at the time as the subscription
// callback.
subscription.dispose()
Enter fullscreen mode Exit fullscreen mode

Take aways

  • When using Observable.create(), check the order in which you set a listener vs set the disposable.
  • I opened a PR to fix SQLDelight. In the meantime, remove subscribeOn calls for observables that originate from Query.asObservable(). As you saw from the code above, all Query.asObservable() is doing is setting a listener so there's no need to be on any special scheduler here.

Top comments (0)