DEV Community

Cover image for Observables and Observers in RxJS
Rajesh Rathore
Rajesh Rathore

Posted on

Observables and Observers in RxJS

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by Array methods (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.

Think of RxJS as Lodash for events.

The essential concepts in RxJS which solve async event management are:

  • Observable: represents the idea of an invokable collection of future values or events.
  • Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.
  • Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.
  • Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.
  • Subject: is equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.
  • Schedulers: are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.

Before going to deep understanding of Oberservables and promises. We need to know about Pull and Push

Pull versus Push

Pull and Push are two different protocols that describe how a data Producer can communicate with a data Consumer.

What is Pull? In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.

Every JavaScript Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming it by "pulling" out a single return value from its call.

What is Push? In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.

Promises are the most common type of Push system in JavaScript today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers), but unlike functions, it is the Promise which is in charge of determining precisely when that value is "pushed" to the callbacks

RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers).

SINGLE Value MULTIPLE Value
Pull Function Iterator
Push Promise Observable

Observable :

Observables are lazy Push collections of multiple values.

Observables are like functions with zero arguments, but generalize those to allow multiple values.

Consider the following:

function foo() {
  console.log('Hello');
  return 42;
}

const x = foo.call(); // same as foo()
console.log(x);
const y = foo.call(); // same as foo()
console.log(y);
Enter fullscreen mode Exit fullscreen mode

We expect to see as output:

"Hello"
42
"Hello"
42
Enter fullscreen mode Exit fullscreen mode

You can write the same behavior above, but with Observables:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
  console.log('Hello');
  subscriber.next(42);
});

foo.subscribe((x) => {
  console.log(x);
});
foo.subscribe((y) => {
  console.log(y);
});
Enter fullscreen mode Exit fullscreen mode

And the output is the same:

"Hello"
42
"Hello"
42
Enter fullscreen mode Exit fullscreen mode

This happens because both functions and Observables are lazy computations. If you don't call the function, the console.log('Hello') won't happen. Also with Observables, if you don't "call" it (with subscribe), the console.log('Hello') won't happen

Subscribing to an Observable is analogous to calling a Function.

Some people claim that Observables are asynchronous. That is not true. If you surround a function call with logs, like this:

console.log('before');
console.log(foo.call());
console.log('after');
Enter fullscreen mode Exit fullscreen mode

You will see the output:

"before"
"Hello"
42
"after"
Enter fullscreen mode Exit fullscreen mode

And this is the same behavior with Observables:

console.log('before');
foo.subscribe((x) => {
  console.log(x);
});
console.log('after');
Enter fullscreen mode Exit fullscreen mode

And the output is:

"before"
"Hello"
42
"after"
Enter fullscreen mode Exit fullscreen mode

Which proves the subscription of foo was entirely synchronous, just like a function.

Observables are able to deliver values either synchronously or asynchronously.

What is the difference between an Observable and a function? Observables can "return" multiple values over time, something which functions cannot. You can't do this:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}
Enter fullscreen mode Exit fullscreen mode

Functions can only return one value. Observables, however, can do this:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100); // "return" another value
  subscriber.next(200); // "return" yet another
});

console.log('before');
foo.subscribe((x) => {
  console.log(x);
});
console.log('after');
Enter fullscreen mode Exit fullscreen mode

With synchronous output:

"before"
"Hello"
42
100
200
"after"
Enter fullscreen mode Exit fullscreen mode

But you can also "return" values asynchronously:

import { Observable } from 'rxjs';

const foo = new Observable((subscriber) => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe((x) => {
  console.log(x);
});
console.log('after');
Enter fullscreen mode Exit fullscreen mode

Output:

"before"
"Hello"
42
100
200
"after"
300
Enter fullscreen mode Exit fullscreen mode

Conclusion:

  • func.call() means "give me one value synchronously"
  • observable.subscribe() means "give me any amount of values, either synchronously or asynchronously"

Core Observable concerns:

  • Creating Observables: The Observable constructor takes one argument: the subscribe function.
import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
});
Enter fullscreen mode Exit fullscreen mode

Observables can be created with new Observable. Most commonly, observables are created using creation functions, like of, from, interval, etc.

  • Subscribing to Observables: The Observable observable in the example can be subscribed to, like this:
observable.subscribe((x) => console.log(x));
Enter fullscreen mode Exit fullscreen mode

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

  • Executing the Observable: The code inside new Observable(function subscribe(subscriber) {...}) represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes. The execution produces multiple values over time, either synchronously or asynchronously.

There are three types of values an Observable Execution can deliver:

  • "Next" notification: sends a value such as a Number, a String, an Object, etc.
  • "Error" notification: sends a JavaScript Error or exception.
  • "Complete" notification: does not send a value.

    In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.

  • Disposing Observables:
    Because Observable Executions may be infinite, and it's common for an Observer to want to abort execution in finite time, we need an API for canceling an execution. Since each execution is exclusive to one Observer only, once the Observer is done receiving values, it has to have a way to stop the execution, in order to avoid wasting computation power or memory resources.

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.

Observer:

What is an Observer? An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error, and complete. The following is an example of a typical Observer object:

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
Enter fullscreen mode Exit fullscreen mode

To use the Observer, provide it to the subscribe of an Observable:

observable.subscribe(observer);
Enter fullscreen mode Exit fullscreen mode

Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.

NOTE: Observers in RxJS may also be partial. If you don't provide one of the callbacks, the execution of the Observable will still happen normally, except some types of notifications will be ignored, because they don't have a corresponding callback in the Observer.


🌟 Thank You for Joining the Journey! 🌟

I hope you found this blog post informative and engaging. Your support means the world to me, and I'm thrilled to have you as part of my community. To stay updated on my latest content.

📌 Follow me on Social Media! 📌

🌐 Visit my Website
📢 Connect with me on Twitter
📷 Follow me on Instagram
📚 Connect on LinkedIn
📌 Check out my GitHub

💌 A Special Message to You! 💌

To all my dedicated readers and fellow tech enthusiasts, I want to express my gratitude for your continuous support. Your engagement, comments, and feedback mean the world to me. Let's keep learning, growing, and sharing our passion for development!

👥 Let's Stay Connected! 👥
If you enjoy my content and want to stay in the loop with my latest posts, please consider following me on my social media platforms. Your support is invaluable.

Thank you for being a part of this amazing journey! 🚀


Top comments (1)

Collapse
 
hseleiro profile image
Hugo Seleiro

Thank you.