Header image: The In-Between by Romain Guy.
In this blog we'll look into how an easy mistake when using Observable.create()
can lead to subtle leaks.
I recently investigated the following leak, which I couldn't reproduce systematically:
┬───
...
├─ com.example.hockey.PlayerQueries$selectAllQuery instance
│ ↓ Query.listeners
│ ~~~~~~~~~
├─ java.util.concurrent.CopyOnWriteArrayList instance
│ ↓ CopyOnWriteArrayList.array
│ ~~~~~
├─ java.lang.Object[] array
│ ↓ Object[].[0]
│ ~~~
├─ sqldelight.runtime.rx.QueryListenerAndDisposable instance
│ Retaining 4.3 kB in 56 objects
│ ↓ QueryListenerAndDisposable.emitter
│ ~~~~~~~
... RxJava observer chain
├─ com.example.hockey.PlayersView$onAttachedToWindow$1 instance
│ Anonymous class implementing io.reactivex.functions.Function
│ ↓ PlayersView$onAttachedToWindow$1.this$0
│ ~~~~~~
╰→ com.example.hockey.view.PlayersView instance
Leaking: YES (View.mContext references a destroyed activity)
In the above leaktrace, PlayerQueries$selectAllQuery
is a generated SQLDelight query. Our PlayersView
is listening for updates to that query while the view is attached by leveraging Query.asObservable(). Once the view is detached, the observable chain is disposed and the query is expected to let go of the corresponding listener.
I inspected the heap dump and found that the view was indeed detached, the observable chain was correctly disposed, and yet the QueryListenerAndDisposable
listener had not been removed from the query. Let's look at the Query.asObservable()
implementation:
fun <T : Any> Query<T>.asObservable(): Observable<Query<T>> {
return Observable.create(QueryOnSubscribe(this))
}
private class QueryOnSubscribe<T : Any>(
private val query: Query<T>
) : ObservableOnSubscribe<Query<T>> {
override fun subscribe(emitter: ObservableEmitter<Query<T>>) {
val listener = QueryListenerAndDisposable(emitter, query)
emitter.setDisposable(listener)
query.addListener(listener)
emitter.onNext(query)
}
}
private class QueryListenerAndDisposable<T : Any>(
private val emitter: ObservableEmitter<Query<T>>,
private val query: Query<T>
) : AtomicBoolean(), Query.Listener, Disposable {
override fun queryResultsChanged() {
emitter.onNext(query)
}
override fun isDisposed() = get()
override fun dispose() {
if (compareAndSet(false, true)) {
query.removeListener(this)
}
}
}
Let's now look at the implementation example from Observable.create():
val observable = Observable.create<Event> { emitter ->
val closeable = api.someMethod { event ->
emitter.onNext(event)
}
emitter.setCancellable(closeable::close)
}
We can rewrite the above example code to use a listener instead of a closeable:
val observable = Observable.create<Event> { emitter ->
// 1. Create a listener
val listener: (Event) -> Unit = { event ->
emitter.onNext(event)
}
// 2. Set the listener
api.addListener(listener)
// 3. Remove the listener on dispose
emitter.setCancellable {
api.removeListener(listener)
}
}
The above implementation is fairly close to the SQLDelight implementation, with one major difference: SQLDelight sets the disposable before adding the listener to the query, i.e. something like this:
val observable = Observable.create<Event> { emitter ->
// 1. Create a listener
val listener: (Event) -> Unit = { event ->
emitter.onNext(event)
}
// 2. Remove the listener on dispose
emitter.setCancellable {
api.removeListener(listener)
}
// 3. Set the listener
api.addListener(listener)
}
It turns out that's a mistake! If emitter
is already disposed when the subscription runs, then we'll add the listener but never remove it:
val observable = Observable.create<Event> { emitter ->
// 1. Create a listener
val listener: (Event) -> Unit = { event ->
emitter.onNext(event)
}
// 2. if emitter is currently already disposed,
// the cancellable callback fires immediately and
// there's no listener to remove yet.
emitter.setCancellable {
api.removeListener(listener)
}
// 3. Set the listener, which will never be removed. Leak!
api.addListener(listener)
}
Can emitter
be already disposed when the subscription runs? Yes! This can happen if the subscription runs on a separate thread from the thread that called subscribe()
:
val subscription = observable.subscribeOn(Schedulers.io())
.subscribe()
// `dispose()` might execute at the time as the subscription
// callback.
subscription.dispose()
Take aways
- When using
Observable.create()
, check the order in which you set a listener vs set the disposable. - I opened a PR to fix SQLDelight. In the meantime, remove
subscribeOn
calls for observables that originate fromQuery.asObservable()
. As you saw from the code above, allQuery.asObservable()
is doing is setting a listener so there's no need to be on any special scheduler here.
Top comments (0)