DEV Community

Cover image for RxJS Beyond Basics: Writing Your Own Operators
Art Stesh
Art Stesh

Posted on

RxJS Beyond Basics: Writing Your Own Operators

RxJS (Reactive Extensions for JavaScript) is a cornerstone of modern reactive programming in JavaScript, widely used in Angular and other frameworks. While RxJS provides a vast collection of operators to work with observables, there are scenarios where default operators don’t suffice, and crafting custom operators becomes necessary. This article will dive into writing your own RxJS operators and provide insights into their structure and use.


What are RxJS Operators?

At their core, RxJS operators transform, filter, or combine streams of data (observables). They are building blocks of reactive programming, allowing you to manipulate streams in a declarative and composable way.

Characteristics of RxJS Operators

  1. Composable: Operators can be chained together using the .pipe() method, creating elegant and concise code.
  2. Pure Functions: Operators follow functional programming principles and are generally stateless and side-effect-free.
  3. Observable-Centric: Every standard operator consumes and outputs observables, enabling seamless integration with RxJS pipelines.

Types of RxJS Operators

RxJS operators can be broadly divided into the following categories:

  1. Pipeable Operators: Functions imported from RxJS that operate on observables and are used inside a .pipe() method. For example: map, filter, mergeMap, etc.

  2. Creation Operators: Functions that produce a brand-new observable. For example: interval, of, from, forkJoin, etc.

Pipeable Operators vs Creation Operators

  • Pipeable Operators: Work on existing observables (source$.pipe(map(x => x * 2))).
  • Creation Operators: Create new observables (interval(1000)).

How Do Pipeable Operators Work?

Pipeable operators are simply functions that take an observable as an input and return a transformed observable. They allow chaining and composition by leveraging the Observable.pipe() method.

For example:

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    filter(x => x % 2 === 0), // Allow only even numbers
    map(x => x * 10)          // Multiply each value by 10
  )
  .subscribe(console.log); 
// Output: 20, 40
Enter fullscreen mode Exit fullscreen mode

Here’s how it works:

  1. filter() processes each value emitted by the source observable and passes only those meeting the condition.
  2. map() transforms the filtered data by multiplying it by 10.
  3. The result is emitted to the subscriber.

Writing a Custom Pipeable Operator

Creating your own pipeable operator is remarkably simple — you're essentially writing a higher-order function that returns a new observable. This allows you to abstract and reuse more complex stream manipulations.

Steps to Create a Pipeable Operator

  1. Import Required RxJS Tools: Import tools from RxJS like Observable and OperatorFunction.
  2. Define the Operator Function: Write a function that takes input parameters and applies the wanted logic.
  3. Return an Observable: Use the observable$.pipe() method or the new Observable() constructor to return transformed values.

Example: Creating a Custom Pipeable Operator multiplyBy

Let’s write a custom operator that multiplies each emitted value by a provided number:

import { Observable, OperatorFunction } from 'rxjs';

export function multiplyBy(multiplier: number): OperatorFunction<number, number> {
  return (source: Observable<number>): Observable<number> => 
    new Observable<number>(subscriber => {
      const subscription = source.subscribe({
        next(value) {
          // Emit the multiplied value
          subscriber.next(value * multiplier);
        },
        error(err) {
          subscriber.error(err);
        },
        complete() {
          subscriber.complete();
        }
      });

      // Return the teardown logic for cleanup
      return () => subscription.unsubscribe();
    });
}
Enter fullscreen mode Exit fullscreen mode

Using the multiplyBy Operator

You can now use your operator as part of an RxJS pipe:

import { of } from 'rxjs';
import { multiplyBy } from './multiply-by';

of(1, 2, 3, 4)
  .pipe(multiplyBy(3))
  .subscribe(console.log);
// Output: 3, 6, 9, 12
Enter fullscreen mode Exit fullscreen mode

Here, multiplyBy(3) takes the source values and multiplies them by 3 before emitting them.


Creation Operators: Crafting New Observables

In addition to transforming existing observables, RxJS offers creation operators to produce observables from scratch. These are vital for creating streams from events, intervals, or custom user-defined logic.

Let’s create a creation operator called delayedRange, which emits numbers in a range after a defined delay.

Example: delayedRange

The operator will:

  1. Start emitting numbers from the start value.
  2. Emit every number up to end.
  3. Include a delay between emissions.

Here’s the implementation:

import {Observable} from 'rxjs';

export function delayedRange(start: number, end: number, delayTime: number): Observable<number> {
  return new Observable<number>(subscriber => {
    let current = start;
    let timeoutId: any | null = null;
    // Emits values with the delay
    const emitValue = () => {
      if (current <= end) {
        subscriber.next(current);
        current++;
        timeoutId = setTimeout(emitValue, delayTime); // Emit the next value
      } else {
        subscriber.complete(); // Finish the stream if the last value is reached
      }
    };
    emitValue(); // Start values emission
    // The cleaning function, which is being fired on unsubscribe
    return () => {
      if (timeoutId) clearInterval(timeoutId);
      console.log('Finished');
    };
  });
}
Enter fullscreen mode Exit fullscreen mode

Explanation of the Code

  1. Observable Logic:

    • Use setInterval to emit values sequentially.
    • Increment the current value (current) on each tick.
  2. Complete the Stream:

    • Once we reach the end value, call subscriber.complete() to finish the observable.
  3. Clean Up Resources:

    • Provide teardown logic to clear the interval when the observable is unsubscribed.

Using delayedRange

import { delayedRange } from './delayed-range';

delayedRange(1, 5, 1000).subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Complete!')
});
// Output (with 1 second delay between each):
// 1
// 2
// 3
// 4
// 5
// Complete!
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Operators are the Heart of RxJS: They enable chaining and provide meaningful transformations to observables.
  2. Custom Operators Unlock Creativity: Pipeable operators help abstract complex logic, while creation operators allow building streams tailored to specific requirements.
  3. Patterns Matter: Writing reusable and self-contained operators makes your reactive code cleaner and easier to maintain.

RxJS is not just about using pre-built utilities — it's about understanding the patterns and unlocking the full power of reactive programming. Custom operators serve as a bridge between the generality of RxJS and the specific needs of your application.

Keep experimenting! The more you dive into RxJS, the more intuitive and elegant your streams will become.

Happy coding! 🚀

Top comments (0)