Learnitweb

Subject in RxJS

1. Introduction

An RxJS Subject is an Observable that allows values to be multicasted to many Observers. An RxJS Subject is different from the plain Observable with the fact that plain Observable is unicast, i.e. each Observer has an independent execution of the Observable.

A RxJS subject is like an EventEmitter; it maintains the list of Observers.

There are three subclasses of Subject in RxJS:

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

2. Creating a Subject

Creating a Subject is simple, using Constructor:

const subject = new Subject();

3. A Subject is an Observable

You can subscribe to a Subject. You provide an Observer (to receive values) while subscribing to the Subject. The subscribe of the Subject registers the Observer to its list of Observers.
In case of Subject, the subscribe method does not invoke a new execution which delivers values.

4. A Subject is an Observer

An RxJS Subject is an Observer in the sense that it contains methods next(v), error(e), and complete(). The next(v) method feeds a new value to the Subject, which is then multicasted to the registered Observers.

5. Example

import { Subject } from "rxjs";

const subject = new Subject();

//subject is an Observable, you can subscribe to it
subject.subscribe({
  next: (value) => console.log(`first observer: ${value}`),
});
subject.subscribe({
  next: (value) => console.log(`second observer: ${value}`),
});

//subject is an Observer, it has next(v), error(e), and complete()
subject.next(1);
subject.next(2);

Output

first observer: 1
second observer: 1
first observer: 2
second observer: 2

6. BehaviorSubject

A BehaviorSubject is a subclass of Subject and it emits its current value whenever it is subscribed to. A BehaviorSubject stores the latest value emitted to its Observers. Whenever a new Observer subscribes, it will immediately receive the “current value” from the BehaviorSubject.
BehaviorSubject is useful to represent “values over time”. For example, the stock price at the end of the trading session.

import { BehaviorSubject } from "rxjs";
//create BehaviorSubject with initial value 0
const subject = new BehaviorSubject(0);

subject.subscribe({
  next: (value) => console.log(`first observer: ${value}`),
});

//feed new values
subject.next(1);
subject.next(2);

subject.subscribe({
  next: (value) => console.log(`second observer: ${value}`),
});

subject.next(3);

Output

first observer: 0
first observer: 1
first observer: 2
second observer: 2
first observer: 3
second observer: 3

In this example, first observer and second observer receive the latest value of the BehaviorSubject whenever they subscribe.

7. ReplaySubject

A ReplaySubject emits old values to the new subscribers. A ReplaySubject records multiple values from the Observable execution and replays them to new subscribers. You can specify how many values and the amount of time to hold a value in the buffer before removing it from the buffer.

import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(4); // buffer 4 values for new subscribers

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.next(5);
subject.next(6);

subject.subscribe({
  next: (value) => console.log(`observer: ${value}`),
});

Output

observer: 3
observer: 4
observer: 5
observer: 6

In this example, we have created a ReplaySubject with buffer of 4. Notice that whenever a new Observer subscribes, 4 values from the buffer are replayed to the Observer.

8. AsyncSubject

AsyncSubject sends only the last value of the Observable execution to its observers only when the execution completes. It is different from other Subjects we discussed so far as it does not store the value.

import { AsyncSubject } from "rxjs";
const subject = new AsyncSubject();

subject.subscribe({
  next: (value) => console.log(`first observer: ${value}`),
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (value) => console.log(`second observer: ${value}`),
});

subject.next(5);
subject.complete();

Output

first observer: 5
second observer: 5

Notice that both observers receive only the latest value 5 and only when the execution is complete by calling complete.