DEV Community

Cover image for RxJS Efficiency: Managing Performance and Optimizing Subscriptions
Art Stesh
Art Stesh

Posted on • Originally published at habr.com

RxJS Efficiency: Managing Performance and Optimizing Subscriptions

Introduction

RxJS is one of the most powerful tools in the Angular ecosystem, allowing you to work with asynchronous data streams with elegance and flexibility. However, behind this power lies a hidden issue: if streams are not managed properly, your applications may suffer from memory leaks. This is a potential silent killer of performance. In this article, I will share techniques for managing RxJS streams and provide advice on how to avoid memory leaks.


Why Are Memory Leaks in RxJS Streams Dangerous?

RxJS makes it easy to subscribe to data streams using the subscribe operator. However, each subscription, like a timer or event listener, lives until it is explicitly canceled or the stream is completed. If an Angular component keeps a subscription active even after the component is destroyed, the following can happen:

  1. Data will continue to stream into the destroyed component.
  2. Dependent resources (e.g., references to DOM nodes or other streams) will not be released.
  3. Memory leaks occur, which is especially problematic in large applications with many users.

Cold and Hot Streams

Cold Streams

Cold streams create a unique chain for each subscriber. This means every time you subscribe to a stream, it restarts, potentially leading to repeated operations such as API requests or heavy computations.

Example:

import { Observable } from 'rxjs';

const cold$ = new Observable(observer => {
  console.log('Stream started');
  observer.next(Math.random());
  observer.complete();
});

cold$.subscribe(value => console.log('Subscriber 1:', value));
cold$.subscribe(value => console.log('Subscriber 2:', value));
// Console output:
// Stream started
// Subscriber 1: 0.123...
// Stream started
// Subscriber 2: 0.456...
Enter fullscreen mode Exit fullscreen mode

Here, each subscriber gets its own copy of the stream.


Hot Streams

A hot stream is already running and works independently of subscribers. All subscribers receive data simultaneously, but late subscribers might miss initial events.

Example:

import { Subject } from 'rxjs';

const hot$ = new Subject<number>();

hot$.subscribe(value => console.log('Subscriber 1 receives:', value));

hot$.next(42); // Data sent to current subscribers

hot$.subscribe(value => console.log('Subscriber 2 receives:', value));
// Output:
// Subscriber 1 receives: 42
Enter fullscreen mode Exit fullscreen mode

Here, Subscriber 2 misses the first value (42) because it subscribes after the value is sent.


How to Convert Cold Streams to Hot?

Use the share or shareReplay operators to make a stream hot.

import { interval } from 'rxjs';
import { share } from 'rxjs/operators';

const cold$ = interval(1000).pipe(share());

cold$.subscribe(value => console.log('Subscriber 1:', value));
setTimeout(() => cold$.subscribe(value => console.log('Subscriber 2:', value)), 3000);
// Output:
// Subscriber 1: 0
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Subscriber 2: 3
Enter fullscreen mode Exit fullscreen mode

The stream now retains its state and becomes shared among all subscribers.


RxJS Stream Management Strategies

Now, let’s explore how to master stream handling and eliminate leaks.


1. Proper Use of takeUntil

Instead of manually tracking and canceling subscriptions, use a completion signal stream. This can be implemented using Subject, which emits a value before the component is destroyed.

Example:

import { Component, OnDestroy } from '@angular/core';
import { Subject, interval, takeUntil } from 'rxjs';

@Component({
  selector: 'app-example',
  template: `<p>The stream is active! Check the console.</p>`
})
export class ExampleComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    interval(1000) // every second
      .pipe(takeUntil(this.destroy$)) // complete the stream on destroy
      .subscribe(value => console.log(value));
  }

  ngOnDestroy() {
    this.destroy$.next(); // emit completion signal
    this.destroy$.complete(); // clear destroy$
  }
}
Enter fullscreen mode Exit fullscreen mode

Combine takeUntil with Angular lifecycle hooks (ngOnDestroy) to ensure stream cleanup.


2. Use AsyncPipe to Minimize Manual Subscriptions

AsyncPipe automatically manages subscriptions and unsubscriptions in templates. Use it when passing data from components or services to the UI, reducing boilerplate code.

Example (no extra unsubscribe code required):

<div *ngIf="data$ | async as data">
  {{ data }}
</div>
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Automatically unsubscribes when the component is destroyed.
  • Less code — fewer chances for errors.

3. Limit Subscriptions Using take, first, or takeWhile

If you only need the first few values from a stream, limit the subscription with operators like take(1) to automatically complete it.

Example:

this.someService.getData().pipe(
  take(1)
).subscribe(data => console.log(data));
Enter fullscreen mode Exit fullscreen mode

This avoids long-lived listeners and simplifies stream management.


4. Using Subscription and add()

When subscriptions are created manually, group them into a single Subscription object and clean them up simultaneously.

Example:

import { Subscription } from 'rxjs';

@Component({
  selector: 'app-example',
  template: `<p>Example with Subscription</p>`
})
export class ExampleComponent implements OnDestroy {
  private subscriptions = new Subscription();

  ngOnInit() {
    const sub1 = this.service.getData1().subscribe();
    const sub2 = this.service.getData2().subscribe();

    this.subscriptions.add(sub1);
    this.subscriptions.add(sub2);
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}
Enter fullscreen mode Exit fullscreen mode

To simplify, create an abstract class for onDestroy and subscriptions, and extend all components from it to avoid repetitive code.


5. Lazy Stream Loading with switchMap

When a component needs to react to changes in one stream by transforming it into another, use switchMap instead of manually subscribing to nested streams. This ensures only the latest stream remains active, canceling previous ones automatically.

Example (avoiding race conditions):

this.searchControl.valueChanges.pipe(
  switchMap(searchTerm => this.api.searchByTerm(searchTerm))
).subscribe(results => this.searchResults = results);
Enter fullscreen mode Exit fullscreen mode

This approach prevents leaks caused by updating input data.


6. Caching with shareReplay

If multiple subscribers use the same stream, avoid additional operations by caching data via shareReplay.

import { timer } from 'rxjs';
import { shareReplay } from 'rxjs/operators';

const apiCall$ = timer(2000).pipe(shareReplay(1));

apiCall$.subscribe(value => console.log('Subscriber 1:', value));
apiCall$.subscribe(value => console.log('Subscriber 2:', value));
Enter fullscreen mode Exit fullscreen mode

The stream executes only once, regardless of the number of subscribers.


7. Lazy Stream Initialization with defer

Sometimes a stream should not start immediately and only initialize on subscription. This is useful for heavy operations.

import { defer, timer } from 'rxjs';

const lazy$ = defer(() => {
  console.log('Stream initialization...');
  return timer(1000);
});

lazy$.subscribe(value => console.log(value));
Enter fullscreen mode Exit fullscreen mode

8. Event Throttling with throttleTime

User events often occur too frequently (e.g., scroll, mousemove events). Optimize them using throttleTime:

import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';

const scroll$ = fromEvent(document, 'scroll').pipe(throttleTime(200));

scroll$.subscribe(() => console.log('Scroll event handled!'));
Enter fullscreen mode Exit fullscreen mode

Tools for Monitoring Leaks

  • Chrome DevTools: Use the Performance tab and timelines to identify memory anomalies.
  • RxJS Marbles: Useful for testing streams.

Tips for Stream Optimization

  1. Always complete streams.
    Use unsubscribe, takeUntil, or AsyncPipe.

  2. Avoid redundant subscriptions.
    Share data with share or shareReplay.

  3. Profile streams with tap.
    Integrate tap to debug during development.

  4. Minimize complex operations.
    Use event throttling (throttleTime), limiting (take), and filtering (filter).


Conclusion

RxJS is a powerful tool, but it requires careful design to ensure that your code is not only functional but also fast, safe, and "clean."

Top comments (0)