Wednesday, October 30, 2019

Using RxJS


Recently I've used RxJS as part of our reverse proxy code. The learning curve to start using the RxJS was very fast for a startup project. However once you have a problem in the RxJS usage, you might spend a lot of time trying to understand the problem. This is mostly due to poor errors description, and no useful error stack in case of problems.



RxJS is based on an implementation and usage of Observable objects.
One example for this is the redux-observable library, which is a middleware that enables redux actions to be handled as Observable objects as part of epics.

For example, for example, to handle action of type TYPE1, and send action of TYPE2, you can use the following:

import {ofType} from 'redux-observable'
import {flatMap} from 'rxjs/operators'

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return [
         { type: TYPE2}
      ]
    })
  )

Notice the following:

  • The action$ is the Observable of the redux action
  • We configure the epic to handle only actions of type TYPE1
  • We return an array of actions that will be handled, so the flatMap converts each of the returned array elements into a new observable.

What if we want to examine the action parameters, and decide which is the next action to be handled accordingly?

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap((action) => {
      return [
         { type: action.myParameter===2 ? 'TYPE2' : ' TYPE3' }
      ]
    })
  )

We can see that we must not access the action$ variable, as it is not the action, but instead it is the observable wrapping the action. However, once we get the action into the flatMap, we get the actual action and its values.


RxJS handles can be combined with Promises. Instead of using await to wait for a promise completion, we convert the promise to observable:

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        })
      )
    })
  )


Notice:

  • We have converted the Promise to observable using the from keyword
  • We must use return from to return the observable created by the from
  • The observable created by the from is handled using a new pipeline

What is something fails? Here is when RxJS power come into the play:
export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        }),
        retryWhen(errors => retryFunction(errors)),
      )
    })
  )


function retryFunction(errors) {
  return errors.pipe(
    flatMap((error, count) => {
      if (count > 3) return throwError(error)
      logger.error(`retrying attempt ${count}, error: ${error}`)
      return timer(100)
    }),
  )
}


The retryWhen will rerun the entire from observable upon our decision.
In this example, we retry 3 times with 100ms delay, and if it still fails, we throw an error.


What if we want a central location to handle errors?
We can produce a new redux action to and handle it in a central errors handling epic:

export const handleType1Epic = (action$) =>
  action$.pipe(
    ofType('TYPE1'),
    flatMap(() => {
      return from(myPromiseReturningFunction(action.myParameter)).pipe(
        flatMap((myPromiseResult) => {
          return [
             { 
               type: 'TYPE2',
               result: myPromiseResult
             }
          ]
        }),
        retryWhen(errors => retryFunction(errors)),
        catchError(error => [{type: 'ERROR', action, error}]),
      )
    })
  )

export const errorEpic = (action$) =>
  action$.pipe(
    ofType('ERROR'),
    flatMap(action => {
      logger.error(`error at action ${JSON.stringify(action.action)}: ${action.error}`)
      return []
    }),
  )

To sum:
RxJS could realy accelerate the development of new applications, due to its powerful builtin capabilities. Maintaining RxJS might be more complex than standard Promises based code.
I still recommend using RxJS instead of standard redux code.


Wednesday, October 23, 2019

Docker cleanup using system prune


Running docker build usually involves creating new docker image, and tagging it with the current build number. But this leads to leaving images leftovers on the build machine, that would eventually cause out of disk space issues. Let's examine the reason for that, and how can we address this issue.


The Build Process

The build process usually includes the following steps:
  • Run docker build using tag according to the Jenkins build number
  • Push the build number tagged docker image
  • Tag the image using :latest tag as well
  • Push the latest tagged docker image
For example (assuming 123 is the current build number):

1. docker build -t my-registry/my-image:123 .
2. docker push my-registry/my-image:123
3. docker tag my-registry/my-image:123 my-registry/my-image:latest
4. docker push my-registry/my-image:latest
5. docker rmi my-registry/my-image:123

The problem starts when we run the next build.
Let examine the docker images after running build #123:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  latest  8193a6ec2439  17 seconds ago      91.4MB

And now let run build #124:

1. docker build -t my-registry/my-image:124 .
2. docker push my-registry/my-image:124

So the docker images are now:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  124     bfef88428b40  17 seconds ago      91.4MB
my-registry/my-image  latest  8193a6ec2439  55 seconds ago      91.4MB

And after the tag to latest command:

3. docker tag my-registry/my-image:124 my-registry/my-image:latest
4. docker push my-registry/my-image:latest

The images are now:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  124     bfef88428b40  17 seconds ago      91.4MB
my-registry/my-image  latest  bfef88428b40  55 seconds ago      91.4MB
<none>                <none>  8193a6ec2439  2 minutes ago       91.4MB

So now we have leftover "zombie" image marked with <none>.
This is even we have removed the previous build image, and marked the new build as "latest".
Even after removing the build 124:

5. docker rmi my-registry/my-image:124

We still have the zombie image:
REPOSITORY            TAG     IMAGE ID      CREATED             SIZE
my-registry/my-image  latest  bfef88428b40  55 seconds ago      91.4MB
<none>                <none>  8193a6ec2439  2 minutes ago       91.4MB

The Solution

The solution is to use the command:

docker system prune -f


This would remove any zombie images.
Notice this only works in combination with `docker rmi`  for the current build image, and leave only the 'latest' tag for the image. This ensures that the latest tag is replaced to the new build, and the onld image remains as <none>, hence allowing the docker system prune command to remove it.

From the docker documentation page, we can find the the system prune command removes all unused containers (and more).


See also Delete images from a Private Docker Registry.

Tuesday, October 15, 2019

Create NodeJS service with kubernetes liveness and readiness probes


To create a kubernetes service using a NodeJS server, you've probably used an express server, and configured the deployment and the service in kubernetes, and you're done!

But, wait...
What about stability?
What about upgrade?

You probably want kubernetes to restart your NodeJS application if its failing. Will it?
You probably want kubernetes to stop the old version of the application only after the new version deployment is ready. Will it?

This is where kubernetes liveness and readiness probes come into the rescue.
Let's review these probes.

Liveness Probe


The goal of the liveness probe is to signal that the pod is alive.
Well, that is obvious.
But why do we really need to do anything here?
Won't kubernetes find that our process is down, and automatically restart it?
The answer is yes, but this is not the correct question.
What if our process is up, but it is stuck, and not responding to new requests?
This is where we need to assist kubernetes to find this problem, and restart our pod.
We can do this by implementing a handler to a specific health related URL.

Readiness Probe


We already have a liveness probe.
Why do we need another?

Actually we don't have to include readiness probe in all services.
The readiness probe should be included in case you want kubernetes not to include the pod in the service, because something is not ready, but still avoid from restarting the pod.
A classical example is dependencies:


  • Service A require service B for its operation.
  • Service A is up and running, but it cannot serve its client since service B cannot be accessed.
  • Restarting service A will not fix the problem.

Implementation Example

This is an example for a NodeJS code to implement liveness and readiness probes.
const express = require('express');
const server = express();

server.get('/probe/live', (req, res) => {
 res.status(200).send("ALIVE");
});

server.get('/probe/ready', async (req, res) => {
 if (myDependenciesAreOk()){
  res.status(200).send("READY");
 }
 else{
  res.status(500).send("NOT READY");
 }
});

server.listen(8080);

To use these probes, we need to configure the kubernetes deployment.
Note the parameters for each probe should be considered to avoid high impact on the deployment from the one hand, and prevent clients from reaching to unavailable service on the other hand. (See some guidelines in here)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-deployment
spec:
  replicas: 2
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      containers:
        - name: my-app
          image: my-app:latest
          livenessProbe:
            timeoutSeconds: 5
            successThreshold: 1
            failureThreshold: 3
            initialDelaySeconds: 30
            periodSeconds: 10
            httpGet:
              path: /probe/live
              port: 8080
          readinessProbe:
            timeoutSeconds: 5
            successThreshold: 1
            failureThreshold: 1
            initialDelaySeconds: 10
            periodSeconds: 10
            httpGet:
              path: /probe/ready
              port: 8080



Monday, October 7, 2019

Create NodeJS based application using Docker WebPack and Babel

Javascript is a simple language, and using it makes new applications development very fast.
However, unlike other languages, such as Java, which can include maven as it build system, Javascript does not include a easy and fast build system. You will need to invest time in creation of the build and tests.

In this article we lay the foundations of creating a new NodeJS application that is using Webpack and babel for it transpilation (see this link for transpilation details). In addition, we will pack the application using a docker image, so it will be ready for deployment.


  


Let's start with folder structure. We will later explain and present the content for each file.
  • demo-application
    • Dockerfile
    • Dockerfile.test
    • .dockerignore
    • src
      • package.json
      • package-lock.json
      • .babelrc
      • webpack.config.js
      • main
        • demo-main.js
      • test
        • demo-main.test.js

demo-main.js

This is the actual application code.
In this example, we connect to MongoDB using mongoos library.

const mongoose = require('mongoose');
console.log('connecting to MongoDB');
mongoose.connect('mongodb://127.0.0.1:27017/db', {useNewUrlParser: true})
  .then(()=>{
     console.log('connected!');
   });

demo-main.test.js

This is where we run our unit tests.
We use proxyquire to import our code, and replacing any dependencies with our own stub.
This ensures that we are running unit test without any external dependencies.
In this case, we replace the mongoos dependency with a stub.

const chai = require('chai');
const chaiHttp = require('chai-http');
const proxyquire = require('proxyquire');
const expect = chai.expect;
chai.should();
chai.use(chaiHttp);


async function importCode() {
  proxyquire.noCallThru().load('../main/demo-main.js',
    {
      'mongoose': {
        connect: function(){
   console.log('stub connection to mongo');
 }
      }
    }
  );
}

describe('tests', () => {
  it('check main code', async () => {
    const main = await importDeploy();
  });
});

package.json

The package json includes the dependencies, as well as the build and test commands.
The scripts include:

  • build: creates bundle.js from our source
  • start: run the application
  • dev: runs nodemon which automatically recompiles the application upon source change. This is useful for debugging
  • unit-test: run the tests, and create coverage report
The dependencies include the only dependency we have in our code: mongoose.

The dev dependencies include all the libraries that we need for transpilation, and for unit test.


{
  "name": "demo",
  "scripts": {
    "build": "webpack --config ./webpack.hook.js",
    "start": "node build/bundle.js",
    "dev": "nodemon --watch main --exec node build/bundle.js",
    "unit-test": "nyc mocha ./test/**/*.js 
                  --require @babel/register 
                  --require @babel/polyfill 
                  --timeout 10000 
                  --reporter=xunit 
                  --reporter-options output=coverage.xml"
  },
  "dependencies": {
    "mongoose": "^5.6.9"
  },
  "devDependencies": {
    "@babel/cli": "^7.5.5",
    "@babel/core": "^7.5.5",
    "@babel/node": "^7.5.5",
    "@babel/plugin-proposal-class-properties": "^7.5.5",
    "@babel/polyfill": "^7.4.4",
    "@babel/preset-env": "^7.5.5",
    "@babel/register": "^7.5.5",
    "babel-loader": "^8.0.6",
    "chai": "^4.2.0",
    "chai-http": "^4.2.1",
    "mocha": "^6.0.2",
    "nodemon": "^1.18.10",
    "nyc": "^14.1.1",
    "proxyquire": "^2.1.0",
    "webpack": "^4.39.2",
    "webpack-cli": "^3.3.6"
  },
  "nyc": {
    "all": true,
    "reporter": [
      "lcov"
    ]
  }
}

package-lock.json

This file is automatically generated once we run `npm install`.
The file includes all the libraries that the project depend on.
This file should be added to the source control system, to ensure stability of the project.

.babelrc

This file instruct the babel to automatically select the transpilation level that we need.
In addition, we include the source maps in the bundle.js, so we will be able to debug our code.
{
  "presets": [
    "@babel/preset-env"
  ],
  "sourceMaps": true,
  "retainLines": true
}

webpack.config.js

This file configured webpack.
It instruct webpack to create the bundle.js file, and set the demo-main.js as the starting point for the project.

const path = require('path');
const nodeExternals = require('webpack-node-externals');
const env = process.env.NODE_ENV || "development";

console.log(`node environment is ${env}`);
console.log(`current dir is ${process.cwd()}`);

module.exports = {
    mode: env,
    target: 'node',
    entry: [
        '@babel/polyfill',
        './main/demo-main.js'
    ],
    output: {
        filename: 'bundle.js',
        path: path.resolve(__dirname, 'build'),
    },
    devtool: "eval-source-map",
    module: {
        rules: [
            {
                test: /\.js$/,
                loader: "babel-loader",
                exclude: "/node_modules/",
                options: {
                    presets: ["@babel/preset-env"]
                }
            }
        ]
    },
    externals: [nodeExternals()],
};

Dockerfile

The Dockerfile contains instructions to build the docker image for our application.
Notice that the docker entry point calls directly to node, due to SIGTERM killing npm without waiting for the child node process.
To run the build, run: `docker build -f Dockerfile`

FROM node:12.7 as builder
WORKDIR /app
COPY src/package.json ./
COPY src/package-lock.json ./
RUN npm install
COPY src/ ./
RUN npm run build
CMD ["node", "build/bundle.js"]

Dockerfile.test

Very similar to the one before, only that we run the tests, instead of the application.
To run the tests, run: `docker build -f Dockerfile.test`
FROM node:12.7 as builder
WORKDIR /app
COPY src/package.json ./
COPY src/package-lock.json ./
RUN npm install
COPY src/ ./
RUN npm run build
CMD ["npm","run", "unit-test"]

.dockerignore

This file contains the list of files we want to avoid from being sent to the docker service upon build.
This saves significant time upon build.
**/node_modules
**/build


Wednesday, October 2, 2019

ElasticSearch bulk index using javascript

ElasticSearch is great and easy to use tool. I've already published several posts about it.


Lately I've had an issue were I was running many parallel documents index requests into ElasticSearch. The code section below is a simplified version of the parallel documents indexing.
See ElasticSearch javascript API documentation.

const elasticsearch = require('elasticsearch');
const elasticsearchClient = new elasticsearch.Client({
    hosts: 'http://127.0.0.1:9200'
});


// index of 10K documents in parallel
const elastic = elasticFactory.produce();
await elastic.waitForElasticSearch();
const promises = [];
for (let i = 0; i < 10000; i++) {
  const promise = elasticIndex('myindex', {
    mydata: i,
  });
  promises.push(promise);
}
await Promise.all(promises);



// function to handle async index of one document
async function elasticIndex(index, doc) {
  const promise = new Promise((resolve, reject) => {
  const indexConfig = {
    index: index,
    body: doc,
  };
  elasticsearchClient.index(indexConfig, (err, resp, stats) => {
    if (err) {
      reject(err);
    }
      resolve();
    });
  });
  await promise;
}
Running this example fails with error:

remote_transport_exception indices:data/write/bulk statusCode":429
type: "es_rejected_execution_exception"
reason: "rejected execution of processing... queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor

While you may update ElasticSearch configuration to somehow stand the stress of handling so many parallel indexing  requests, this is the wrong solution.
Instead of using index per document, accumulate the documents, and once in a while, bulk index all of the documents in a single request.


A simple "delayed" bulk index is displayed in the code below.
Notice that the bulk index API is not as well documented as I've expected it to be.
I've created error handling to print the errors per failed index.
For example, bulk index of 1000 documents, might result it 30 failures and 970 success indexing.
The error handling in this example, displays how many of the documents have failed, as well as unique list of errors.

// bulk index the documents every 5 seconds
setInterval(commit, 5000);

let indicies = {};

function delayedIndex(index, doc) {
  let documents;
  if (indicies[index]) {
    documents = indicies[index];
  } else {
    documents = [];
    indicies[index] = documents;
  }
  documents.push(doc);
}

async function commit() {
  Object.keys(indicies).forEach(async (index) => {
    const documents = indicies[index];
    const response = await bulkIndex(index, documents);
    analyzeResponse(index, response);
  });
  indicies = {};
}

function analyzeResponse(index, response) {
  if (response.errors === false) {
    return;
  }
  let errors = [];
  response.items.forEach((i) => {
    const error = i.index.error;
    if (error) {
      errors.push(error.reason);
    }
  });
  console.log(`bulk index ${index} failed with 
    ${errors.length}/${response.items.length} 
    errors: ${JSON.stringify([... new Set(errors)])}`);
}


async function bulkIndex(index, docs) {
  if (docs.length === 0) {
    return;
  }
  const body = [];
  docs.forEach(doc => {
    body.push({
      index: {
        _index: index
      }
    });
    body.push(doc);
  });

  const promise = new Promise((resolve, reject) => {
    elasticsearchClient.bulk({body: body},
    (err, resp) => {
      if (err) {
        reject(err);
      }
      resolve(resp);
    });
  });
  return await promise;
}
This easily manage indexing of over 10K documents.
Notice that in case of stress, some of the document indexing might fail.
This is WAD by ElasticSearch.
If you find it important, you should save the failed documents, and retry at a later stage, once the stress on the ElasticSearch is reduced.