This project was generated with Angular CLI version 13.3.0.
Rx
stands for Reactive Programming which refers to Programming with asynchronous data streams.
A stream is a data which arrives over a period of time. The stream of data can be anything like variables, user inputs, cache, data structures, etc
At any point, data stream may emit any of the following three events:
- Value : The next value in the stream.
- Error : The stream has ended.
- Complete : The error has stopped the stream or data stream is over.
The Reactive Programming
is all about creating streams, emitting values, error or complete notifications, manipulate and transform data.
The Rxjs (Reactive Extensions Library for Javascript)
is a Javascript library, that allows us to work with Asynchronous data streams.
Angular uses RXJS
library heavily in its framework to implement Reactive Programming. For example,
- Reacting to an
HTTP
Request in Angular, i.e. by subscribing. - Value Changes / Status Changes in Reactive forms.
- Custom events that send Observable output data from child component to a parent component
Observables
: is a function that emits single or multiple values over time either synchronously or asynchronously.
Observers
: Observables
on its own is useless unless someone consumes the value emitted by the Observable. We call them
Observers
or Subscribers
.
-
Simplest way to create Observable is using the Observable
constructor
. The Observableconstructor
takes an argument for its callback function (subscriber - argument).This callback function will run when thisObservable's subscribe
method executes. -
next()
: The observable invokes thenext()
callback whenever a value arrives in the stream. It passes the value as anargument
to thiscallback
. -
error()
: Sends JS Error / Exception as argument. No further value is emitted. Stream stops. -
complete()
: Observable invokes this when stream completes. After emitting the complete() notification, no value is emitted to the subscriber after that.
Syntax
: let obs = new Observable(subscriber => {
console.log('Start emitting');
subscriber.next('Hi')
});
There are easier ways to create Observables using Rxjs Operators.
The Observers
communicate with the Observable
using callbacks
. While subscribing to the Observable
, it passes
three optional callbacks. We can pass these callbacks within an Object as an argument for subscribe()
method. If we are expecting only the value
emitted by the Observable
, then it can be passed without the Object syntax.
Check app.component.ts
for implementation of subscribing
to Observables
.
The Operators
are functions that operate on the Observables and return a new Observable. We can manipulate incoming observable, filter it,
merge it with another Observable or subscribe to another Observable.
We can also chain each operator one after the other using the pipe. Each operator in the chain gets the Observable from the previous operator. It modifies it and creates new Observable, which becomes the input for next Operator.
The following table lists some of the commonly used Operators.
Operation | Operators |
---|---|
Combination | combineLatest, concat, merge, startWith , withLatestFrom, zip |
Filtering | debounceTime, distinctUntilChanged, filter, take, takeUntil, takeWhile, takeLast, first, last, single, skip, skipUntil, skipWhile, skipLast |
Transformation | bufferTime, concatMap, map, mergeMap, scan, switchMap, ExhaustMap, reduce |
Utility | tap, delay, delaywhen |
Error Handling | throwerror, catcherror, retry, retrywhen |
Multicasting | share |
-
Observable.create()
-> Calls the ObservableConstructor
behind the scenes. Create is a method of the Observable object, hence don't have to import it. This method is deprecated. Use constructor instead. -
of
creates an Observable from thearguments
we pass into it. We can pass any number of arguments to theOf
. Each argument is emitted one after the other. It sends thecomplete
signal in the end. -
from
operates creates takes onlyone argument
that can be iterated and converted into anObservable
. Sendscomplete
signal in the end.
Example Array: from([a,b,c]) => a->b->c->complete Example String: from('Hello') => 'h'->'e'->'l'->'l'->'o' -> complete.
Observables from collections
: Anything that can beiterated
can be converted into anObservable
usingfrom
operator.
-
FromEvent
method allows us to create an Observable fromDOM events
directly. -
Arguments
: EventTargetElement: First, EventName: Second
Syntax :
fromEvent(this.button.nativeElement, 'click').subscribe({next: () => {}, complete: () => {}})
When we subscribe
to an observable
, which we created using the fromEvent
method, it registers the event handler using the addEventListener
in the DOM element
Pipe
method of Angular Observable is used to chain multiple operators together. Rxjs Operators
are functions that take Observables as Input and transform
it into a new Observable
and return it.
Each argument of the pipe
method must be separated by a comma
. The order of operators is important because when a user subscribes to an Observable, the pipe executes in the order in which they are added.
There are 2 ways, we can use the pipe. One as an instance
of Observable
and the other way is to use it as a standalone
method.
We chain the operators op1, op2 etc that are passed as argument to the pipe
method. The output of op1 becomes the Input of op2.
obs.pipe(
op1(),
op2(),
op3(),
)
Note: If we are emitting multiple values through operators
in the pipe chain
, each observable would go through the entire chain and will be delivered to the subscriber, only then the next one will be streamed.
Refer pipeOperatorsUsingFilterMap()
method.
We can also use pipe as a standalone function and re-use
the pipe at other places. We need to import pipe from rxjs. Check reusablePipe
method for custom pipe creation.
tap
: The tap operator returns a new Observable which is the mirror copy of the source observable
. Mostly used for debugging purpose. It does not modify the stream in any way.
Example: Logging the values of Observables. Refer tapObservables()
method
map
: can be used with HTTP Request, with DOM Events, filtering the input data etc..
Arguments
map(value : emitted by the observable ,index: 0 for the first value emitted and incremented by one for every subsequent value) optional.
Note : keyValue
pipe from @angular/common
can transform an Object to Array of key-value pairs
const obj = {person1: 'jon',person2: 'hopper',person3: 'mona'}
const transformObj = this.keyValuePipe.transform(obj);
Result : [ { "key": "person1", "value": "jon" }, { "key": "person2", "value": "hopper" }, { "key": "person3", "value": "mona" } ]
- We can also use
multiple maps
within samepipe
.
-Most widely used operator which can filter items emitted based on a condition.
SwitchMap
operator maps
each value from the source observable
to an inner observable
. The source observable subscribes to the inner observable
and emits value from it.
SwitchMap
function must return an Observable
map
emits values as Observables
, switchMap
subscribes to an Inner Observable and emits values from it.
someStream$.pipe(
switchMap(args => makeApiCall(args)), // must return a stream
map(response => process(response)) // returns a value of any shape, usually an object or a primitive
).subscribe(doSomethingWithResults);
Example use case
: This works perfectly for scenarios like form Input/search Input where you are no longer concerned with the response of the previous request when a new input arrives.
The main difference between switchMap and other flattening operators is the cancelling effect. On each emission the previous inner observable (the result of the function you supplied) is cancelled and the new observable is subscribed. You can remember this by the phrase switch to a new observable.
This operator is best used when you wish to flatten
an inner observable
but want to manually control the number of inner subscriptions.
In contrast to SwitchMap
, mergeMap allows for multiple inner subscriptions to be active at a time.
If the order of emission and subscription of inner observables is important, try concatMap
. SwitchMap
never cancels inner Observable.
Memory Leaks
: Using mergeMap
operator can often lead to memory leaks since it allows multiple inner subscriptions, so make sure to use Operators like take
, takeUntil
Why use take
?
- When you are interested in only the
first emission
, you want to use take. Maybe you want to see what the user first clicked on when they entered the page, or you would want to subscribe to the click event and just take the first emission.
-
Another use-case is when you need to take a snapshot of data at a particular point in time but do not require further emissions. For example, a stream of user token updates, or a route guard based on a stream in an Angular application.
💡 If you want to take a number of values based on some logic, or another observable, you can use takeUntil or takeWhile! 💡
take
is the opposite ofskip
where take will take the first n number of emissions while skip will skip the first n number of emissions.
obs.pipe(take(2)).subscribe()
The takeUntil
operator returns an Observable
that emits value from the source Observable
until the notifier Observable
emits a value.
TakeUntil(notifier: Observable): Observable
We must pass a notifier observable as the argument
to the TakeUntil Operator
.
-
TakeUntil emits the values from the Source Observable as long as it does not receive any value from the notifier observable.
-
When the notifier emits a value, the TakeUntil completes the Source observable.
Check sample code in transform.component.ts
TakeWhile
operator will keep emitting the value from the source observable until they pass the given condition (predicate). When it receives a value that does not satisfy the condition it completes the observable.
The difference is that takeWhile discards the rest of the stream, when it receives the first value that does not satisfy the condition. The filter operator never stops the observable.
TakeLast
operator emits the last n number of values
from the source observable.
first/last
operator emits the first/last matching value if the condition is present.If there is no condition present, it emits th first/last value it receives.
Error
notification is sent if no value is emitted from source
.
The skip operators skips the values from the source observable based on a condition
. The Skip
, SkipUntil
, SkipWhile
skips the values from the start of the source. The SkipLast
Operator skips elements from the end of the source.
Filter
emits the value if the predicate(condn) is trueSkipWhile
skips the value if the predicate(condn) is true
Subjects
are special Observable which acts as both Observer
and Observable
. They allow us to emit
new values to the Observable
stream using the next
method.
- All the
subscribers
, who subscribe to thesubject
will receive the same instance of the subject & hence the same values.
- A
Subject
is a special type ofObservable
which allows values to bemulti-casted
to manyobservers
.
Subject
implements both subscribe
method and next
, error
and complete
.
subject$ = new Subject();
ngOnInit() {
this.subject$.subscribe(val => console.log(value))
this.subject$.next(1);
this.subject$.next(2);
this.subject$.complete();
}
Observables
are classified into two groups.
- Cold Observable
- Hot Observable
The cold
observable does not activate the producer until there is a subscriber
. The producer
emits the value only when a subscriber subscribes to it.
The Hot
observable does not wait for a subscriber
to emit the data. It can start emitting the values right away.
subject$ = new Subject();
ngOnInit() {
subject$.next(1);
subject$.next(2);
subject$.complete();
}
In the above example, since there were no subscribers
, no one receives the data but that did not stop the subject from emitting data.
Now consider the following example. Here the subjects that emits the values 1 & 2 are lost because subscription happens after they emit values.
ngOnInit() {
subject$.next(1);
subject$.next(2);
subject$.subscribe(val => console.log(val));
subject$.next(3);
subject$.next(4);
subject$.complete();
}
Observer
needs to implement next
, error
, complete
callback (all optional) to become an Observer
.
let obs$ = new Observable(observer => {
observer.next(1);
observer.error('error');
})
this.subject$.subscribe(val => {
console.log(val);
});
obs$.subscribe(subject$);
Since the subject$
implements next
method, it receives the value from observable
and emits them to subscribers
. So we can subscribe to observable and use subject$
as observer
.
Another important distinction between observable
and subject
is that Subjects
are multi cast
.
- More than one subscriber can subscribe to a Subject. They will
share
the same instance of the observable. All subscribers will receive the same event when the Subject emits it.
- Multiple
observers
of anobservable
will receive a separate instance of theobservable
.
Check uniCastVsMultiCast
method in subject.component.ts
.
Whenever subscriber
subscribes to a Subject
, it will add it to an array of Subscribers
. This way Subject
keeps track of all the subscribers
and emits the event
to all of them.
- BehaviorSubject
- ReplaySubject
- AsyncSubject
BehaviorSubject
requires an initial value
and stores the current value and emits it to the new subscribers.
subject$ = new BehaviorSubject(0);
subject$.subscribe(val => console.log(val)); //0
subject$.next(1);
BehaviorSubject
will always remembers the last emitted value ans shares it with new subscribers
.
ReplaySubject
replays old values to new Subscribers
when they first subscribe.
- The
ReplaySubject
will store every value it emits in a buffer. We can configure thebuffer arguments
using thebufferSize
andwindowTime
.
bufferSize
: No. of items that ReplaySubject
will keep in its buffer. It defaults to infinity.
windowTime
: The amount of time to keep the value in the buffer.
Even when subscription happens after the values are emitted, ReplaySubject stores the values in a buffer.
AsyncSubject
only emits the latest value when it completes. If it errors out, then it will emit an error, but will not emit anymore values.
Check asyncSubjectDemo
method in subject.component.ts
The Scan
& Reduce
Operators in Angular applies an accumulator
function on the values of the source observable. The Scan
Operator returns all intermediate results of the accumulation, while Reduce
only emits the last result. Both also use an optional seed value as the initial value.
Both emit values from the source observable, only after a certain amount of time has elapsed since the last value. Both emit only the latest value and discard any intermediate values.
The typeahead/autocomplete fields
are one of the most common use cases for Debounce
Operators.
-
As the user types in the typeahead field, we need to listen to it and send an HTTP request to the back end to get a list of possible values. If we send HTTP requests for every keystroke, we end up making numerous calls to the server.
-
By using the
Debounce
Operators, we wait until the user pauses typing before sending an HTTP Request. This will eliminates unnecessary HTTP requests.