Observables in Angular 2 can be achieved using RxJS(Reactive Extensions for JavaScript). Observables is an ES7 feature so you need to make use of an external library to use it today. RxJS is a library that allows you to work with asynchronous data streams. So what are asynchronous data streams?
- Asynchronous – we can call a function and register a callback to be notified when results are available, so we can continue with execution and avoid the Web Page from being unresponsive. This is used for ajax calls, DOM-events, Promises, WebWorkers and WebSockets.
- Data – raw information in the form of JavaScript data types as: Number, String, Objects (Arrays, Sets, Maps).
- Streams – sequences of data made available over time. Technically everything is stream.
Observables can help manage async data and a few other useful patterns. Observables are similar to Promises but with a few key differences. The first is Observables emit multiple values over time. For example a Promise once called will always return one value or one error. This is great until you have multiple values over time. Web socket/real-time based data or event handlers can emit multiple values over any given time. This is where Observables really shine. Observables are used extensively in Angular 2.
Observables |
Promise |
Observables handle multiple values over time |
Promises are only called once and will return a single value |
Observables are cancellable |
Promises are not cancellable |
The ability of observables being able to handle multiple values over time makes them a good candidate for working with real-time data, events and any sort of stream you can think of. Being able to cancel observables gives better control when working with in-flow of values from a stream. The common example is the auto-complete widget which sends a request for every key-stroke.
|
RxJS Observable |
Promise |
Execution |
Lazy |
Eager |
Asynchronous |
YES |
YES |
Handles data sources that produce ONE value |
YES |
YES |
Handles data sources that produce MULTIPLE values |
YES |
|
Debouncing &
Throttling |
YES |
NO |
Can be cached |
YES |
NO |
Can be cancelled |
YES |
NO |
Retry from failure |
YES |
NO |
RxJS also provides Observable operators which you can use to manipulate the data being emitted. Some of common operators are:
- Map
- Filter
- Take
- Skip
- Debounce
- Retry
So in angular 2 now you services will start returns observables instead of promises like shown below for category service, getCategories returns an observable.
import { Injectable, Inject } from '@angular/core';
import { Http, Response} from '@angular/http';
import { OpaqueToken } from '@angular/core';
import {Category} from './category';
import {Observable} from 'rxjs/Observable';
export let INTERVIEW_APP_CONFIG = new OpaqueToken('app.config');
export interface ApplicationConfiguration{
apiEndPoint : string,
timeOut: number
}
export const INTERVIEW_APP_DI_CONFIG: ApplicationConfiguration = {
apiEndPoint: 'http://app.cloudapp.net/api/Category',
timeOut: 20
};
@Injectable()
export class CategoryService {
categoryServiceUrl :string;
constructor(private http: Http, @Inject(INTERVIEW_APP_CONFIG) config: ApplicationConfiguration){
console.log(config.apiEndPoint);
this.categoryServiceUrl = config.apiEndPoint;
}
getCategories (): Observable<Category[]>{
console.log("Get Categories");
return this.http.get(this.categoryServiceUrl)
.map(this.extractCategoryData)
.catch(this.handleError);
}
private extractCategoryData(res: Response){
let body = res.json();
return body || { };
}
private handleError (error: any) {
let errMsg = (error.message) ? error.message :
error.status ? `${error.status} - ${error.statusText}` : 'Server error';
console.error(errMsg); // log to console instead
return Observable.throw(errMsg);
}
}
An observable is only enabled when a first observer subscribes. This is a significant difference compared to promises. As a matter of fact, processing provided to initialize a promise is always executed even if no listener is registered. This means that promises don’t wait for subscribers to be ready to receive and handle the response. When creating the promise, the initialization processing is always immediately called.
Observables are lazy so we have to subscribe a callback to let them execute their initialization callback. below component defines the subscribe for the observable which has been returned,
import { Component, OnInit } from '@angular/core';
import { Category } from './category';
import './rxjs-operators';
import { CategoryService, INTERVIEW_APP_CONFIG, INTERVIEW_APP_DI_CONFIG } from './categoryService';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css'],
providers:[CategoryService, { provide: INTERVIEW_APP_CONFIG, useValue: INTERVIEW_APP_DI_CONFIG }]
})
export class AppComponent {
title = 'app works!';
firstName="Aamol";
errorMessage: string;
categories: Category[];
constructor(private categoryService: CategoryService){
}
ngOnInit(){
this.getCategories();
console.log("Getting categories");
}
getCategories() {
this.categoryService.getCategories()
.subscribe(
categories => this.categories = categories,
error => this.errorMessage = error
);
}
}
Observables allow you to register callbacks as shown above, the subscribe method three callbacks as parameters:
- The onNext callback that will be called when an event is triggered.
- The onError callback that will be called when an error is thrown.
- The onCompleted callback that will be called when the observable completes.
Here is the way to register callbacks on an observable:
categoryService.getCategories.subscribe(
(event) => {
// handle events
},
(error) => {
// handle error
},
() => {
// handle completion
}
);
The observable class provides a fromPromise method to create an observable from a promise. This allows you to make a promise part of an asynchronous data stream
Retrying requests – Observable allows you to repeat the source observable sequence the specified number of times or until it successfully terminates. In the context of HTTP requests, this allows you to transparently re-execute requests that failed.
getCategories (): Observable<Category[]>{
console.log("Get Categories");
return this.http.get(this.categoryServiceUrl)
.retry(4)
.map(this.extractCategoryData)
.catch(this.handleError);
}
Enable RxJS Operators – The RxJS library is quite large. So Angular 2 exposes a stripped down version of Observable in the rxjs/Observable module, a version that lacks most of the operators including map, retry which we have used above, it’s up to us to add the operators we need. We could add every RxJS operators with a single import statement. but we’d pay a penalty in extended launch time and application size because the full library is so big. We only use a few operators in our app.
Instead, we’ll import each Observable operator one-by-one, we’ll put the import statements in one app/rxjs-operators.ts file.
// Statics
import 'rxjs/add/observable/throw';
// Operators
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/toPromise';
import 'rxjs/add/operator/retry';