Explore RxJS-make a small github app

Explore RxJS-make a small github app

This article is a practical tutorial on RxJS, using RxJS and github API to build a small github application step by step. Therefore, the focus of the article is to explain the use of RxJS, while the ES6 syntax, webpack and other knowledge points involved are not explained.



All the code of this example is in the github repository: rxjs-examplePS: Zhihu's editor is really the hardest to use I have ever used, so if you want a better reading experience, please move to: github/Coding-Guide: Explore RxJS-make a github app


The first thing to note is that there are currently two mainstream RxJS on github, which represent different versions:

The installation and reference of these two versions are slightly different:

#   4.x  

$ npm install rx --save

#   5 beta  

$ npm install rxjs --save
 
//4.x  

import Rx from 'rx';

//5 beta  

import Rx from 'rxjs/Rx';
 

In addition, their syntax is slightly different. For example, in the 5 beta version, you can substitute an object as a parameter when you subscribe, or you can substitute a callback function as a parameter, while the 4.x version only supports callback functions. Parameters of the situation:

//5 beta

var observer = {

  next: x => console.log('Observer got a next value: ' + x),

  error: err => console.error('Observer got an error: ' + err),

  complete: () => console.log('Observer got a complete notification'),

};

Observable.subscribe(observer);



//5   4.x  

Observable.subscribe(x => console.log(x), (err) => console.log(err), () => console.log('completed'));
 

More other syntax differences can refer to:


Let's start

As mentioned above, we need to use RxJS and github API to build a small github application step by step. First complete its basic function, that is, input text through an input, and send asynchronous requests based on the change of the input value in real time, and call the github API to search. As shown in the picture ( online Demo):

Through RxJS, asynchronous search is performed in real time during the input process:

Asynchronously obtain user information after hover to avator

Install webpack to configure the compilation environment and use ES6 syntax. Install the following dependencies and configure webpack:

  • webpack
  • webpack-dev-server
  • babel-loader
  • babel-preset-es2015
  • html-webpack-plugin
  • css-loader/postcss and others
  • jquery
  • rx (version 4.x)

Through webpack-dev-server, we will start a server with port 8080 so that our compiled resources can be located at localhost:8080/webpack-dev-server Visited.



Initialize the DOM event stream

Write an input in index.html, we will listen to the keyup event of the input through the Observable of RxJS in index.js. Can use fromEventTo create a stream based on DOM events and pass the map And filter Further processing.

<!-- index.html -->

<input class="search" type="text" maxlength="1000" required placeholder="search in github"/>
 
//src/js/index.js
import Rx from 'rx';

$(() => {
  const $input = $('.search');

 //  input   keyup  
  const observable = Rx.Observable.fromEvent($input, 'keyup')
                      //  keyup  
                       .map(() => $input.val().trim())
                       .filter((text) => !!text)
                      //  do   input  
                       .do((value) => console.log(value));

 // 
  observable.subscribe();
});
 

Go to input and type casually, you can see that we have successfully monitored the keyup event, and output the current value of input in the console every keyup.



Asynchronous acquisition in real time

After listening to the input event, we can get the value every time we keyup, then we can get the data asynchronously through it. Break down the whole process:

  1. The user enters any content in the input
  2. Trigger the keyup event, get the current value
  3. Substitute value into an asynchronous method and get data through the interface
  4. Render the DOM with the returned data

In other words, we need to asynchronously process the value returned by each event in the original Observable and make it return a new Observable. It can be handled like this:

  1. Let each value return an Observable
  2. Through flatMap Flatten all Observables into a new Observable

Graphical flatMap:


And since you need to get data asynchronously, in the first step above, you can pass fromPromise To create an Observable:

//src/js/helper.js

const SEARCH_REPOS = 'https://api.github.com/search/repositories?sort=stars&order=desc&q=';

//  ajax   promise
const getReposPromise = (query) => {
  return $.ajax({
    type: "GET",
    url: `${SEARCH_REPOS}${query}`,
  }).promise();
};

//  fromPromise   Observable
export const getRepos = (query) => {
  const promise = getReposPromise(query);
  return Rx.Observable.fromPromise(promise);
};
 
//src/js/index.js

import {getRepos} from './helper';

//...
const observable = Rx.Observable.fromEvent($input, 'keyup')
                     .map(() => $input.val())
                     .filter((text) => !!text)
                     .do((value) => console.log(value))
                    //  getRepos   Observable
                    //flatMap   Observable   Observable
                     .flatMap(getRepos);

//...
 

In this way, every time you keyup, you will get data asynchronously based on the value of the input at this time. But there are several problems with this:

  • When typing continuously, asynchronous requests will be triggered continuously, occupying resources and affecting the experience
  • If the value of the input is the same when the adjacent keyup event is triggered, that is to say, if the key that does not change the value (such as the arrow keys) is pressed, the same asynchronous event will be triggered repeatedly
  • After sending out multiple asynchronous events, the time consumed by each event is not necessarily the same. If the previous asynchronous takes longer than the latter, then when it finally returns the result, it may overwrite the result returned by the latter asynchronous first

So next we will deal with these issues.



Optimize event flow

In view of the above problems, optimize step by step.


When typing continuously, asynchronous requests will be triggered continuously, occupying resources and affecting the experience

That is to say, when the user is typing continuously, we should not continue the subsequent event processing, and if the typing is interrupted, or the time interval between two keyup events is long enough, an asynchronous request should be sent. For this, you can use RxJS's debounce method:

As shown in the figure, when an event is triggered continuously within a period of time, it will not be processed by subsequent operations; only events that exceed the specified time interval will remain:

//src/js/index.js

//...
const observable = Rx.Observable.fromEvent($input, 'keyup')
                    //  400ms   keyup  
                     .debounce(400)
                     .map(() => $input.val())
                     .filter((text) => !!text)
                     .do((value) => console.log(value))
                     .flatMap(getRepos);

//...
 

If the value of the input is the same when the adjacent keyup event is triggered, that is to say, if the key that does not change the value (such as the arrow keys) is pressed, the same asynchronous event will be triggered repeatedly

In other words, for any adjacent events, if their return values are the same, just take one (the first one in the repeated event). You can use distinctUntilChanged method:

//src/js/index.js

//...

const observable = Rx.Observable.fromEvent($input, 'keyup')
                     .debounce(400)
                     .map(() => $input.val())
                     .filter((text) => !!text)
                    // 
                     .distinctUntilChanged()
                     .do((value) => console.log(value))
                     .flatMap(getRepos);

//...
 

After sending out multiple asynchronous events, the time consumed by each event is not necessarily the same. If the previous asynchronous takes longer than the latter, then when it finally returns the result, it may overwrite the result returned by the latter asynchronous first

I believe you may have encountered this painful problem. When sending multiple asynchronous requests, because the time used is not necessarily, the order of asynchronous return cannot be guaranteed, so sometimes the asynchronous result of the earlier request may overwrite the asynchronous result of the later request .



The way to deal with this situation is that when multiple asynchronouss are sent continuously, since we are expecting the result of the last asynchronous return, we can cancel the previous asynchronous without caring what it returns. Therefore, we can use flatMapLatest API (similar to switchMap API in RxJava, and has been renamed switchMap in RxJS 5.0)


Through flatMapLatest, when an Observable triggers an event and returns a new Observable, the previously triggered event will be cancelled, and the processing of the returned result will no longer be concerned, and only the current one will be monitored. That is to say, when sending multiple requests, it does not care about the processing of the previous request, and only the last request is processed:

//src/js/index.js

//...

const observable = Rx.Observable.fromEvent($input, 'keyup')
                     .debounce(400)
                     .map(() => $input.val())
                     .filter((text) => !!text)
                     .distinctUntilChanged()
                     .do((value) => console.log(value))
                    // 
                     .flatMapLatest(getRepos);

//...
 

Stream monitoring

At this point, we have processed the entire event stream of input keyup and asynchronous data acquisition, and performed certain optimizations to avoid problems such as excessive requests and asynchronous return results. But after a stream is created, it is also monitored:

//src/js/index.js

//...

const observable = Rx.Observable.fromEvent($input, 'keyup')
                     .debounce(400)
                     .map(() => $input.val())
                     .filter((text) => !!text)
                     .distinctUntilChanged()
                     .do((value) => console.log(value))
                     .flatMapLatest(getRepos);

//  data  
observable.subscribe((data) => {
 //  showNewResults   DOM
  showNewResults(data);
}, (err) => {
  console.log(err);
}, () => {
  console.log('completed');
});


//  Array  item

//  item  jQuery   content_container  
const showNewResults = (items) => {
  const repos = items.map((item, i) => {
    return reposTemplate(item);
  }).join('');
  $('.content_container').html(repos);
};
 

In this way, a flow of monitoring events through RxJS has been completely established. The whole process is represented by images as follows:


And if we don t use RxJS and listen to the input in the traditional way:

//src/js/index.js

import { getRepos } from './helper';


$(() => {

  const $input = $('.search');
  const interval = 400;

  let previousValue = null;
  let fetching = false;
  let lastKeyUp = Date.now() - interval;

  $input.on('keyup', (e) => {

    const nextValue = $input.val();

    if (!nextValue) {
      return;
    }
    if (Date.now() - lastKeyUp <= interval) {
      return;
    }

    lastKeyUp = Date.now();

    if (nextValue === previousValue) {
      return;
    }

    previousValue = nextValue;

    if (!fetching) {
      fetching = true;
      getRepos(nextValue).then((data) => {
        fetching = false;
        showNewResults(data);
      });
    }

  });
});
 

Isn't it complicated? And even so, such treatment is still not adequate. The above is just to judge whether it is asynchronous through the fetching variable. If it is asynchronous, no new asynchrony will be performed; what we hope more is to be able to cancel the old asynchronous and only process the new asynchronous request.



More elegant Rx style

According to the above tutorial, we got the data in Observable, sent asynchronous request and got the latest return value. After that, through subscribe, the return value is spliced into HTML and inserted into the DOM in the listener callback.

But there is a problem: Another function of the small application is to asynchronously obtain and display user information when the mouse is hovered over the avatar. However, the user avatar is dynamically inserted in the subscribe callback. How to create an event stream?

Of course, you can use `fromEvent` to create a hover-based event stream after each insertion into the DOM, but that is always not good, and the code written is not enough Rx. Maybe we shouldn't interrupt the delivery of the stream after .flatMapLatest(getRepos)? But in that case, how do you insert the asynchronous return value into the DOM?


For this situation, we can use RxJS do method:


You can do anything you want in the callback of do, it will not affect the events in the stream; in addition, you can also get the return value of each event in the stream:

var observable = Rx.Observable.from([0, 1, 2])
    .do((x) => console.log(x))
    .map((x) => x + 1);

observable.subscribe((x) => {
  console.log(x);
});
 

Therefore, we can use do to complete the DOM rendering:


//src/js/index.js

//...

//$conatiner   div
const $conatiner = $('.content_container');

const observable = Rx.Observable.fromEvent($input, 'keyup')
                     .debounce(400)
                     .map(() => $input.val())
                     .filter((text) => !!text)
                     .distinctUntilChanged()
                     .do((value) => console.log(value))
                     .flatMapLatest(getRepos)
                    // 
                     .do((results) => $conatiner.html(''))
                    //  Rx.Observable.from   Observable  flatMap   results   item
                     .flatMap((results) => Rx.Observable.from(results))
                    //  item   jQuery  
                     .map((repos) => $(reposTemplate(repos)))
                    //  jQuery  
                     .do(($repos) => {
                       $conatiner.append($repos);
                     });

//  subscribe  
observable.subscribe(() => {
  console.log('success');
}, (err) => {
  console.log(err);
}, () => {
  console.log('completed');
});
 

Simply perfect! Now our observable returns a jQuery object in turn through map at the end. Then if you want to add hover monitoring to the avatar later, you can continue on the basis of this flow.



Create a hover-based event flow

We next create a stream for the hover event of the user avatar. The user's details are loaded asynchronously, and a modal pops up when hovering to the avatar. If it is the first hover, there is only one loading icon in the modal, and the data is obtained asynchronously, and then the returned data is inserted into the modal; and if the data has been obtained and inserted, there will be no asynchronous request, directly Show:


Display loading when there is no data, and get data asynchronously at the same time
Insert data after asynchronous return. And if you already have data, you will show it directly

Regardless of the previous stream, let's create a new event stream:

//src/js/index.js

//...

const initialUserInfoSteam = () => {

  const $avator = $('.user_header');

 //  $avator   hover  
  const avatorMouseover = Rx.Observable.fromEvent($avator, 'mouseover')
       //500ms  
        .debounce(500)
       // 
        .takeWhile((e) => {
         //  DOM   DOM   infos_container
         //  infos_container  takeWhile   false 
          const $infosWrapper = $(e.target).parent().find('.user_infos_wrapper');
          return $infosWrapper.find('.infos_container').length === 0;
        })
        .map((e) => {
          const $infosWrapper = $(e.target).parent().find('.user_infos_wrapper');
          return {
            conatiner: $infosWrapper,
            url: $(e.target).attr('data-api')
          }
        })
        .filter((data) => !!data.url)
       //getUser  
        .flatMapLatest(getUser)
        .do((result) => {
         //  DOM   DOM   infos_container   div  takeWhile   false  hover  
          const {data, conatiner} = result;
          showUserInfo(conatiner, data);
        });

  avatorMouseover.subscribe((result) => {
    console.log('fetch user info succeed');
  }, (err) => {
    console.log(err);
  }, () => {
    console.log('completed');
  });
};
 

There is an API that needs to be explained in the above code: takeWhile



It can be seen from the figure that when the callback in takeWhile returns true, the flow can proceed normally; and once it returns false, the subsequent events will not occur again, and the flow will directly terminate:


var source = Rx.Observable.range(1, 5)
               .takeWhile(function (x) { return x < 3; });

var subscription = source.subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); }
);

//Next: 0
//Next: 1
//Next: 2
//Completed
 

After creating an event stream for hover, we can combine it with the previous event stream:


//src/js/index.js

//...

const initialUserInfoSteam = ($repos) => {
  const $avator = $repos.find('.user_header');
 //...
}

const observable = Rx.Observable.fromEvent($input, 'keyup')
                    //...
                     .do(($repos) => {
                       $conatiner.append($repos);
                       initialUserInfoSteam($repos);
                     });

//...
 

Now this is ready to use, but it is still not good enough. There are currently two streams: the stream that monitors the input keyup and the stream that monitors the mouseover.

However, because the user avatar is dynamically inserted, we must create and monitor mouseover after $conatiner.append($repos);. But since we have inserted the obtained data in the final do method, we can try to merge the two streams together:

//src/js/index.js

//...

const initialUserInfoSteam = ($repos) => {

  const $avator = $repos.find('.user_header');

  const avatorMouseover = Rx.Observable.fromEvent($avator, 'mouseover')

 //...  
 //  subscribe   Observable
  return avatorMouseover;
};

const observable = Rx.Observable.fromEvent($input, 'keyup')
 //...
  .do(($repos) => {
    $conatiner.append($repos);
   //  do  
   //initialUserInfoSteam($repos);
  })
 //  flatMap   mouseover  
  .flatMap(($repos) => {
    return initialUserInfoSteam($repos);
  });

//...
 

DONE!


APIS

RxJS API used in Chestnut:


  • from Create a stream through an iterable object
  • fromEvent Create a flow through DOM events
  • debounce If an event in the stream is continuously triggered within a certain period of time, subsequent event operations will not be performed
  • map Traverse all events in the stream and return a new stream
  • filter Filter all events in the stream and return to the new stream
  • flatMap Process the value returned by each event and return an Observable, and then flatten all Observables into a new Observable
  • flatMapLatestThe value returned by each event is processed and an Observable is returned, and then all Observables are flattened into a new Observable. But only the last Observable returned will be obtained, and the other returned results will not be processed
  • distinctUntilChanged If the results of adjacent events in the stream are the same, only one will be filtered out (remove duplicate values)
  • do You can get the return value of each event on the stream in turn, and use it to do something unrelated to stream delivery
  • takeWhile Give the stream a judgment, only when the callback in takeWhile returns true, the stream will continue execution; otherwise, the subsequent events will be interrupted


Extended reading