DEV Community

Cover image for Operators & Subscription in Observables

Operators & Subscription in Observables

Introduction

We discussed Observables in the previous post. Observables provide us data streams over time. Once an Observables produces data, we need to be able to modify/manipulate it or use it. To do this is where operators and subscriptions come in.

Operators are functions that operate on an Observable and return an Observable. This allows us to chain these operators. Each operator in the chain modifies the Observable that results from the operation of the previous operator.

Operators in a chain do not work simultaneously, but they operate in sequence, each one working on the Observable generated by the operator immediately before in the chain.

Subscription is done using the subscribe operator. A subscribe operator allows the observer to connect with an Observable. For an observer to get data or error from an Observable, it first has to subscribe to that Observable.

Operators

Operators in RxJS are categorized into two sections:

  1. Pipeable operators are operators that can be chained together. These are pure functions that take an observable as input and provide an observable as output.
observeable.pipe(
  operator1(),
  operator2(),
  operator3(),
  operator3(),
)

operator1 will take in the observable, perform an operation on it and emit an observable. The emitted observable from operator1 is passed to operator2 as input (and so forth through the rest of the operators).

filter, mergeMap and forkJoin are some examples of pipeable operators.

  1. Creation operators are standalone functions that create a new Observable.
import { of } from 'rxjs';
const observable = of(1, 2, 3);

The variable observable is an Observable that will emit 1, 2, and 3 (in sequence).

create, of and from are examples of creation operators.

Subscription

Before we get into subscription, let's understand the difference between hot and cold observable.

A β€œcold” πŸ₯Ά Observable does not begin to emit values until an observer has subscribed to it.

A β€œhot” πŸ”₯ Observable, on the other hand, can begin emitting values at any time, and a subscriber may begin observing emitted values at any time. But, the subscriber might be missing out on any values emitted before the time of the subscription.

So how do we subscribe to an Observable?

const observable = Observable.create((observer:any) => {
    observer.next("Hello World!");
})

observable.subscribe((message) => console.log(message)); // Hello World!

We've created a variable observable that is an Observable. It returns or emits the value "Hello World!". We can subscribe to observable and get that value using a callback to subscribe.

Our observer can implement 0 to 3 methods in RxJS: onNext, onError and onCompleted.

onNext is called when the Observable emits a value.
onError is called when the Observable fails to generate value.
onCompeted is called by the Observable when it has called onNext for the final time (if it hasn't run into an error).

Top comments (0)