This blog aims to demonstrate concept of Observables in AngularJS. I'm using Typeahead search as an example. It describes implementing Typeahead search in Angular 1.x
Introduction
RxJS - Reactive Extensions JavaScript is dealing with streams of data asynchronously. Observables is one of the important aspects of RxJS. Observable results in stream of data. Many times, for understanding Observables they are compared to Promises. Both deal with asynchronous actions. Difference is unlike Observable, Promise is done once the asynchronous operation is complete. Consider a Http call. Promise is complete once response is obtained (or the call errors out). With Obserables data or items are emitted continuously like a stream.Where do we see stream of data in JavaScript, especially in a browser? I could think of couple of examples,
- Data bound to UI controls: As user interacts with the UI, edits a text field, selects an option in the drop down, and continues to do so, there are series of changes emitted from the control.
- Web Socket - a persistent connection. Server could send a stream of data. As long as the connection is open, browser (or other clients) could continue to obtain new pieces of data. Unlike a XHR, it's not done once we receive response soon after establishing connection.
Another difference between Promises and Observables is that, Observables can be unsubscribed (cancelled).
In the blog, I'm planning to write about "Typeahead search". Here the UI control is an Observable. It emits data items. As user starts keying in search term into the text box results are updated. It triggers XHR request with each change or set of key strokes in the text field. Show results using the latest response. Cancel in-progress, old and obsolete XHRs.
Sample is using AngularJS (v1.x) and RxJS libraries to code this functionality.
Run with Angular 1.x
bower install angular angular-rx
Reference the libraries
<script src="bower_components/rxjs/dist/rx.lite.min.js"></script> <script src="bower_components/angular/angular.min.js"></script>
<script src="bower_components/angular-rx/dist/rx.angular.js"></script>
Bootstrap Angular module with "rx" module as a dependency.
angular.module("typeAheadSample", ["rx"]);
Consider the following template for text field. This is where user keys in the search term.
<input type="text" ng-model="searchString" ng-change="search()">
Notice we are calling "search()" controller function (on $scope) for changes detected with the text field. In the controller inject "rx" service for creating an Observable and $http for making API calls.Create an observable function using rx service API,
var searchControlObservable= $scope
.$createObservableFunction('search');
RxJS has galore of operators (which are functions in JS). Before I describe one such operator, remember an Observable emits items or data. In our example as and when user keys in values into text field, observable is emitting the text to all observers (subscribers).
Each search term results in search result (from the API). And we have series of results. That means we have Observables of Observables.
Consider following code,
searchControlObservable = searchControlObservable.debounce(500)// Observable holds off 500 milliseconds before emitting data.
.flatMapLatest(function(term){
return rx.Observable
.fromPromise($http({
url: "http://localhost:3001/api/search?term=" + $scope.searchString,
method: "get"
}))
.map(function(response){
return response.data;
});
});
The callback for flatMapLatest is invoked for every emitted item of searchControlObservable (which is result of text field change events). This in turn returns Observables out of XHR calls. These are Observables from promises.
flatMap is an operator, useful in such scenarios where we have Observables of Observables. It transforms and merges. A variant of flatMap is flatMapLatest. In our scenario only the latest search result matters. Previous ones could be ignored. So it stops or unsubscribes from previous Observables. As stated earlier unlike Promises, Observables could be stopped or unsubscribed from.
Also notice debounce(500); It will hold off emitting items for 500 ms. This will help control number of XHR calls. We can increase / decrease the number depending on the requirement, acceptable limit for load on the API etc. Ideally we want search calls made for considerable text keyed-in, instead of every little change.
And finally, map returns transformed object.
Subscribe to receive items emitted by Observable,
.subscribe(function(results) {
$scope.$apply(function(){
$scope.messages = results;
});
});
Note: Subscribe function accepts three callbacks or another Observer instance. i) Above example has onNext handler (which receives items emitted by Observer). ii) Error handler iii) onCompleted handler. If an Observer object is passed, its callbacks are invoked.
Loop through the response to show the list of ,
<div ng-repeat="message in messages track by $index">
<div>{{message.name}}</div>
</div>
<div ng-repeat="message in messages track by $index">
<div>{{message.name}}</div>
</div>
Note: The API in the sample (http://localhost:3001/api/search?term=) is a dummy node API that reads from a file and returns results. It's a quick search service for demonstration purposes.
Follow the link for complete code sample. Follow instructions in ReadMe to download and run the sample.