Wednesday, October 2, 2019

ElasticSearch bulk index using javascript

ElasticSearch is great and easy to use tool. I've already published several posts about it.


Lately I've had an issue were I was running many parallel documents index requests into ElasticSearch. The code section below is a simplified version of the parallel documents indexing.
See ElasticSearch javascript API documentation.

const elasticsearch = require('elasticsearch');
const elasticsearchClient = new elasticsearch.Client({
    hosts: 'http://127.0.0.1:9200'
});


// index of 10K documents in parallel
const elastic = elasticFactory.produce();
await elastic.waitForElasticSearch();
const promises = [];
for (let i = 0; i < 10000; i++) {
  const promise = elasticIndex('myindex', {
    mydata: i,
  });
  promises.push(promise);
}
await Promise.all(promises);



// function to handle async index of one document
async function elasticIndex(index, doc) {
  const promise = new Promise((resolve, reject) => {
  const indexConfig = {
    index: index,
    body: doc,
  };
  elasticsearchClient.index(indexConfig, (err, resp, stats) => {
    if (err) {
      reject(err);
    }
      resolve();
    });
  });
  await promise;
}
Running this example fails with error:

remote_transport_exception indices:data/write/bulk statusCode":429
type: "es_rejected_execution_exception"
reason: "rejected execution of processing... queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor

While you may update ElasticSearch configuration to somehow stand the stress of handling so many parallel indexing  requests, this is the wrong solution.
Instead of using index per document, accumulate the documents, and once in a while, bulk index all of the documents in a single request.


A simple "delayed" bulk index is displayed in the code below.
Notice that the bulk index API is not as well documented as I've expected it to be.
I've created error handling to print the errors per failed index.
For example, bulk index of 1000 documents, might result it 30 failures and 970 success indexing.
The error handling in this example, displays how many of the documents have failed, as well as unique list of errors.

// bulk index the documents every 5 seconds
setInterval(commit, 5000);

let indicies = {};

function delayedIndex(index, doc) {
  let documents;
  if (indicies[index]) {
    documents = indicies[index];
  } else {
    documents = [];
    indicies[index] = documents;
  }
  documents.push(doc);
}

async function commit() {
  Object.keys(indicies).forEach(async (index) => {
    const documents = indicies[index];
    const response = await bulkIndex(index, documents);
    analyzeResponse(index, response);
  });
  indicies = {};
}

function analyzeResponse(index, response) {
  if (response.errors === false) {
    return;
  }
  let errors = [];
  response.items.forEach((i) => {
    const error = i.index.error;
    if (error) {
      errors.push(error.reason);
    }
  });
  console.log(`bulk index ${index} failed with 
    ${errors.length}/${response.items.length} 
    errors: ${JSON.stringify([... new Set(errors)])}`);
}


async function bulkIndex(index, docs) {
  if (docs.length === 0) {
    return;
  }
  const body = [];
  docs.forEach(doc => {
    body.push({
      index: {
        _index: index
      }
    });
    body.push(doc);
  });

  const promise = new Promise((resolve, reject) => {
    elasticsearchClient.bulk({body: body},
    (err, resp) => {
      if (err) {
        reject(err);
      }
      resolve(resp);
    });
  });
  return await promise;
}
This easily manage indexing of over 10K documents.
Notice that in case of stress, some of the document indexing might fail.
This is WAD by ElasticSearch.
If you find it important, you should save the failed documents, and retry at a later stage, once the stress on the ElasticSearch is reduced.






No comments:

Post a Comment