RxJS (Reactive Extensions for JavaScript) is a cornerstone of modern reactive programming in JavaScript, widely used in Angular and other frameworks. While RxJS provides a vast collection of operators to work with observables, there are scenarios where default operators don’t suffice, and crafting custom operators becomes necessary. This article will dive into writing your own RxJS operators and provide insights into their structure and use.
What are RxJS Operators?
At their core, RxJS operators transform, filter, or combine streams of data (observables). They are building blocks of reactive programming, allowing you to manipulate streams in a declarative and composable way.
Characteristics of RxJS Operators
-
Composable: Operators can be chained together using the
.pipe()
method, creating elegant and concise code. - Pure Functions: Operators follow functional programming principles and are generally stateless and side-effect-free.
- Observable-Centric: Every standard operator consumes and outputs observables, enabling seamless integration with RxJS pipelines.
Types of RxJS Operators
RxJS operators can be broadly divided into the following categories:
Pipeable Operators: Functions imported from RxJS that operate on observables and are used inside a
.pipe()
method. For example:map
,filter
,mergeMap
, etc.Creation Operators: Functions that produce a brand-new observable. For example:
interval
,of
,from
,forkJoin
, etc.
Pipeable Operators vs Creation Operators
-
Pipeable Operators: Work on existing observables (
source$.pipe(map(x => x * 2))
). -
Creation Operators: Create new observables (
interval(1000)
).
How Do Pipeable Operators Work?
Pipeable operators are simply functions that take an observable as an input and return a transformed observable. They allow chaining and composition by leveraging the Observable.pipe()
method.
For example:
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
filter(x => x % 2 === 0), // Allow only even numbers
map(x => x * 10) // Multiply each value by 10
)
.subscribe(console.log);
// Output: 20, 40
Here’s how it works:
-
filter()
processes each value emitted by the source observable and passes only those meeting the condition. -
map()
transforms the filtered data by multiplying it by 10. - The result is emitted to the subscriber.
Writing a Custom Pipeable Operator
Creating your own pipeable operator is remarkably simple — you're essentially writing a higher-order function that returns a new observable. This allows you to abstract and reuse more complex stream manipulations.
Steps to Create a Pipeable Operator
-
Import Required RxJS Tools: Import tools from RxJS like
Observable
andOperatorFunction
. - Define the Operator Function: Write a function that takes input parameters and applies the wanted logic.
-
Return an Observable: Use the
observable$.pipe()
method or thenew Observable()
constructor to return transformed values.
Example: Creating a Custom Pipeable Operator multiplyBy
Let’s write a custom operator that multiplies each emitted value by a provided number:
import { Observable, OperatorFunction } from 'rxjs';
export function multiplyBy(multiplier: number): OperatorFunction<number, number> {
return (source: Observable<number>): Observable<number> =>
new Observable<number>(subscriber => {
const subscription = source.subscribe({
next(value) {
// Emit the multiplied value
subscriber.next(value * multiplier);
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.complete();
}
});
// Return the teardown logic for cleanup
return () => subscription.unsubscribe();
});
}
Using the multiplyBy
Operator
You can now use your operator as part of an RxJS pipe:
import { of } from 'rxjs';
import { multiplyBy } from './multiply-by';
of(1, 2, 3, 4)
.pipe(multiplyBy(3))
.subscribe(console.log);
// Output: 3, 6, 9, 12
Here, multiplyBy(3)
takes the source values and multiplies them by 3 before emitting them.
Creation Operators: Crafting New Observables
In addition to transforming existing observables, RxJS offers creation operators to produce observables from scratch. These are vital for creating streams from events, intervals, or custom user-defined logic.
Let’s create a creation operator called delayedRange
, which emits numbers in a range after a defined delay.
Example: delayedRange
The operator will:
- Start emitting numbers from the
start
value. - Emit every number up to
end
. - Include a delay between emissions.
Here’s the implementation:
import {Observable} from 'rxjs';
export function delayedRange(start: number, end: number, delayTime: number): Observable<number> {
return new Observable<number>(subscriber => {
let current = start;
let timeoutId: any | null = null;
// Emits values with the delay
const emitValue = () => {
if (current <= end) {
subscriber.next(current);
current++;
timeoutId = setTimeout(emitValue, delayTime); // Emit the next value
} else {
subscriber.complete(); // Finish the stream if the last value is reached
}
};
emitValue(); // Start values emission
// The cleaning function, which is being fired on unsubscribe
return () => {
if (timeoutId) clearInterval(timeoutId);
console.log('Finished');
};
});
}
Explanation of the Code
-
Observable Logic:
- Use
setInterval
to emit values sequentially. - Increment the current value (
current
) on each tick.
- Use
-
Complete the Stream:
- Once we reach the
end
value, callsubscriber.complete()
to finish the observable.
- Once we reach the
-
Clean Up Resources:
- Provide teardown logic to clear the interval when the observable is unsubscribed.
Using delayedRange
import { delayedRange } from './delayed-range';
delayedRange(1, 5, 1000).subscribe({
next: (value) => console.log(value),
complete: () => console.log('Complete!')
});
// Output (with 1 second delay between each):
// 1
// 2
// 3
// 4
// 5
// Complete!
Key Takeaways
- Operators are the Heart of RxJS: They enable chaining and provide meaningful transformations to observables.
- Custom Operators Unlock Creativity: Pipeable operators help abstract complex logic, while creation operators allow building streams tailored to specific requirements.
- Patterns Matter: Writing reusable and self-contained operators makes your reactive code cleaner and easier to maintain.
RxJS is not just about using pre-built utilities — it's about understanding the patterns and unlocking the full power of reactive programming. Custom operators serve as a bridge between the generality of RxJS and the specific needs of your application.
Keep experimenting! The more you dive into RxJS, the more intuitive and elegant your streams will become.
Happy coding! 🚀
Top comments (0)