The Complete Guide to RxJS
The Complete Guide to RxJS: A to Z
RxJS (Reactive Extensions for JavaScript) is a powerful library that allows working with asynchronous streams in a functional programming style. It is primarily used to control and transform data streams through Observables.
RxJS is widely used in Angular but is also useful in Vanilla JS, React, or any JavaScript application when dealing with asynchronous data (e.g., WebSocket, HTTP requests, UI events).
Observable – The foundation of RxJS. It is an object that represents an asynchronous data stream. You can "subscribe" to this stream and receive the changing data.
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next('Hello');
subscriber.next('RxJS');
subscriber.complete();
});
observable.subscribe({
next(value) { console.log(value); },
complete() { console.log('Done'); }
});
next()
– sends datacomplete()
– completes the streamObserver – an object with next
, error
, and complete
methods. You pass it to subscribe()
to receive values from an Observable.
const observer = { next: x => console.log('Got value:', x), error: err => console.error('Error:', err), complete: () => console.log('Completed') };
observable.subscribe(observer);
The most powerful elements in RxJS are **operators**. These are functions that take an Observable and return a new one. Examples: map
, filter
, mergeMap
, switchMap
, debounceTime
, take
, and more.
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
filter(x => x % 2 === 0),
map(x => x * 10)
)
.subscribe(console.log);
Result: 20, 40
Subject is both an Observable and an Observer. You can call next()
on it, and it will pass the value to all subscribers.
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe(x => console.log('First:', x));
subject.subscribe(x => console.log('Second:', x));
subject.next('Hello Subjects!');
Both subscribers will receive the same value:
import { BehaviorSubject } from 'rxjs';
const behavior = new BehaviorSubject('Initial');
behavior.subscribe(val => console.log('A:', val));
behavior.next('Next Value');
behavior.subscribe(val => console.log('B:', val));
The second subscriber will also receive the latest value:
import { ReplaySubject } from 'rxjs';
const replay = new ReplaySubject(2); // stores the last 2 values
replay.next(1);
replay.next(2);
replay.next(3);
replay.subscribe(val => console.log('Subscriber got:', val));
Will receive: 2 and 3
Used for nested Observables (e.g., HTTP requests):
import { fromEvent, switchMap, interval } from 'rxjs';
fromEvent(document, 'click')
.pipe(
switchMap(() => interval(1000))
)
.subscribe(console.log);
Only the stream started after the last click will emit values:
Used to regulate input typing or rapid events.
import { fromEvent } from 'rxjs'; import { debounceTime, map } from 'rxjs/operators';
fromEvent(inputElement, 'input')
.pipe(
debounceTime(300),
map(e => e.target.value)
)
.subscribe(console.log);
Waits 300ms before processing the last input:
import { of, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators';
throwError(() => new Error('Oops'))
.pipe(
catchError(err => {
console.error('Caught:', err.message);
return of('Fallback value');
})
)
.subscribe(console.log);
import { of } from 'rxjs'; import { finalize } from 'rxjs/operators';
of(1, 2, 3)
.pipe(finalize(() => console.log('Finished')))
.subscribe(console.log);
You can write your own operator using the `pipe` and `lift` methods or simply a higher-order function that returns an Observable.