Wednesday, October 30, 2019

Using RxJS


Recently I've used RxJS as part of our reverse proxy code. The learning curve to start using the RxJS was very fast for a startup project. However once you have a problem in the RxJS usage, you might spend a lot of time trying to understand the problem. This is mostly due to poor errors description, and no useful error stack in case of problems.



RxJS is based on an implementation and usage of Observable objects.
One example for this is the redux-observable library, which is a middleware that enables redux actions to be handled as Observable objects as part of epics.

For example, for example, to handle action of type TYPE1, and send action of TYPE2, you can use the following:

import {ofType} from 'redux-observable'
import {flatMap} from 'rxjs/operators'

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return [
         { type: TYPE2}
      ]
    })
  )

Notice the following:

  • The action$ is the Observable of the redux action
  • We configure the epic to handle only actions of type TYPE1
  • We return an array of actions that will be handled, so the flatMap converts each of the returned array elements into a new observable.

What if we want to examine the action parameters, and decide which is the next action to be handled accordingly?

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap((action) => {
      return [
         { type: action.myParameter===2 ? 'TYPE2' : ' TYPE3' }
      ]
    })
  )

We can see that we must not access the action$ variable, as it is not the action, but instead it is the observable wrapping the action. However, once we get the action into the flatMap, we get the actual action and its values.


RxJS handles can be combined with Promises. Instead of using await to wait for a promise completion, we convert the promise to observable:

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        })
      )
    })
  )


Notice:

  • We have converted the Promise to observable using the from keyword
  • We must use return from to return the observable created by the from
  • The observable created by the from is handled using a new pipeline

What is something fails? Here is when RxJS power come into the play:
export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        }),
        retryWhen(errors => retryFunction(errors)),
      )
    })
  )


function retryFunction(errors) {
  return errors.pipe(
    flatMap((error, count) => {
      if (count > 3) return throwError(error)
      logger.error(`retrying attempt ${count}, error: ${error}`)
      return timer(100)
    }),
  )
}


The retryWhen will rerun the entire from observable upon our decision.
In this example, we retry 3 times with 100ms delay, and if it still fails, we throw an error.


What if we want a central location to handle errors?
We can produce a new redux action to and handle it in a central errors handling epic:

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        }),
        retryWhen(errors => retryFunction(errors)),
        catchError(error => [{type: 'ERROR', action, error}]),
      )
    })
  )

export const errorEpic = (action$) =>
  action$.pipe(
    ofType('ERROR'),
    flatMap(action => {
      logger.error(`error at action ${JSON.stringify(action.action)}: ${action.error}`)
      return []
    }),
  )

To sum:
RxJS could realy accelerate the development of new applications, due to its powerful builtin capabilities. Maintaining RxJS might be more complex than standard Promises based code.
I still recommend using RxJS instead of standard redux code.


No comments:

Post a Comment