Wednesday, November 27, 2019

MongoDB ReplicaSet on kubernetes



Recently I wanted to use MongoDB ReplicaSet on kubernetes.
I did not want to use MongoDB cluster, as it is too complex to configure and maintain in a kubernetes environment. I also did not need high MongoDB performance, so sharding was also not required.

Gladly, I've found a Helm chart in the official helm chart github (see here).
It even included special handling for kubernetes pod initialization, in a dedicated script: on-init.sh

However, I've found this is not working.



There were several issues. For example:

  1. Once a MongoDB pod was added to a ReplicaSet configuration, it was never removed. This causes several problems. For example, inability to get quorum once a pod had been restarted and got a different IP.
  2. In case a MongoDB pod had started, but was not able to communicate with the previous pods, it started its own new ReplicaSet, instead of failing.
  3. A single MongoDB pod, which was restarted, and got a new IP, was not able to start the ReplicaSet, and got error that it is not part of its own replica.



I've created my own init container to manage the ReplicaSet configuration. 
It was inspired by the on-init.sh script.
I've decided to use nodeJS, as the logic was too much for a simple shell script.

First, I've created a StatefulSet:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mongodb-statefulset
spec:
  replicas: 3
  selector:
    matchLabels:
      configid: mongodb-container
  template:
    metadata:
      labels:
        configid: mongodb-container
    spec:
      serviceAccountName: mongodb-service-account
      initContainers:
        - name: init
          image: LOCACL_REGISTRY/mongo-init/dev:latest
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
          volumeMounts:
            - name: persist-data
              mountPath: /data
              readOnly: false
      containers:
        - name: mongodb
          image: mongo:4.2.1
          imagePullPolicy: Always
          command:
            - mongod
          args:
            - --config=/mongo-config/mongo.conf
            - --dbpath=/data
            - --replSet=mongoreplica
            - --port=27017
            - --bind_ip=0.0.0.0
          volumeMounts:
            - name: persist-data
              mountPath: /data
              readOnly: false
            - name: mongo-config
              mountPath: /mongo-config
      volumes:
        - name: mongo-config
          configMap:
            name: mongo-config
  volumeClaimTemplates:
    - metadata:
        name: persist-data
      spec:
        accessModes: [ "ReadWriteOnce" ]
        storageClassName: "hostpath"

The StatefulSet includes an init container which runs the nodeJS code to configure the ReplicaSet (this will be covered later in this article).
In addition, it includes a volumeClaimTemplate that allocates the same storage for the related pod even it is restarted.
Also, a config map with mongo.conf file is included. I've currently used an empty file, as I require only the default configuration.

Notice that the StatefulSet includes a serviceAccountName. This account should be granted with permissions to list and get pods.

Now, let review the mongo-init container.
This includes a nodeJS application to configure the MongoDB ReplicaSet.




The following general logic is implemented:

  • Start Mongo DB
  • Run kubectl to get all  related pod IPs
  • Check each of the pods, to located the Mongo DB primary
  • If primary is located, and it is myself, we're done
  • If primary is located, and it is another pod, find my configuration index according to the pod name, e.g. pod name mongodo-statefulset-4 is index#4. If this index exists in the configuration, update the IP in the existing replica. Otherwise, add new secondary.
  • If primary is not located, start a new ReplicaSet.
  • Stop Mongo DB, and wait for shutdown, allowing it to save the updated configuratoin


const podIp = process.env['POD_IP']
const podName = process.env['HOSTNAME']
const mongoReplicaSetName = 'mongoreplica'
const mongoPort = 27017'
const addSecondaryRetries = 20

init()

async function init() {
  await startMongo()
  await configureReplicaSet()
  await stopMongo()
}

async function startMongo() {
  const command = `mongod --config=files/mongo.conf --dbpath=files/data --replSet=${mongoReplicaSetName} --port=${mongoPort} --bind_ip=0.0.0.0`
  const options = {
    checkError: false,
    waitForCompletion: false,
    streamOutputCallback: mongoOutputCallback,
  }
  const promiseWrapper = await runCommand(command, options)
  const commandPromise = promiseWrapper.promise

  const startedPromise = waitForPing()

  const result = await Promise.race([commandPromise, startedPromise])
  if (!result.mongoResponse) {
    throw new Error(`mongo start failed`)
  }
}

function mongoOutputCallback(data) {
  data.split('\n').forEach(line => {
    line = line.trim()
    if (line.length > 0) {
      console.log(`[MONGO] ${line}`)
    }
  })
}

async function stopMongo() {
  const mongoCommand = `db.shutdownServer({force: true})`
  await runMongoAdmin(false, '127.0.0.1', mongoCommand)
  await waitForMongoStop()
}

async function waitForMongoStop() {
  while (true) {
    const processes = await runCommand('ps -ef | grep mongo | grep -v grep', {checkError: false})
    if (processes.trim().length === 0) {
      return
    }

    await sleep(1000)
  }
}

async function configureReplicaSet() {
  const primary = await findPrimaryNode()
  if (primary) {
    await configureReplicaWithExistingPrimary(primary)
  } else {
    if (await isSecondaryPodLocated()) {
      throw new Error('secondary pod located, unable to start until primary located')
    }
    if (!await isReplicaConfigured()) {
      await createNewReplicaSet()
    }
  }
}

async function configureReplicaWithExistingPrimary(primary) {
  if (primary === podIp) {
    return
  }

  const memberIndex = await findMemberIndex(primary)
  if (memberIndex === null) {
    await addAsSecondary(primary)
  } else {
    await updateMemberAddress(primary, memberIndex)
  }
  await waitUntilSecondaryReady()
}

async function updateMemberAddress(primary, memberIndex) {
  const mongoCommand = `c=rs.conf(); c.members[${memberIndex}].host='${podIp}'; rs.reconfig(c)`
  await runMongoAdmin(true, primary, mongoCommand)
}

async function findMemberIndex(primary) {
  const configuration = await getReplicaConfiguration(primary)
  const members = configuration.members
  const podSuffix = parseInt(podName.substring(podName.lastIndexOf('-') + 1))

  for (let i = 0; i < members.length; i++) {
    const member = members[i]
    const memberId = parseInt(member['_id'])
    if (memberId === podSuffix) {
      return i
    }
  }
  return null
}

async function getReplicaConfiguration(primary) {
  let configurationText = await runMongoAdmin(true, primary, `rs.conf()`)
  configurationText = configurationText.replace(/NumberLong\((\d+)\)/g, '$1')
  configurationText = configurationText.replace(/ObjectId\("(\S+)"\)/g, '"$1"')
  return JSON.parse(configurationText)
}

async function isReplicaConfigured() {
  const result = await runMongoAdmin(false, '127.0.0.1', 'rs.status()')
  const configured = !result.includes('no replset config has been received')
  return configured
}

async function addAsSecondary(primary) {
  for (let i = 1; i <= addSecondaryRetries; i++) {
    try {
      await addAsSecondaryOnce(primary)
      return
    } catch (e) {
      console.log(`add secondary node failed: ${e.stack}`)
      if (i < addSecondaryRetries ) {
        console.log(`retry #${i} in a moment`)
        await sleep(10000)
      }
    }
  }
  throw new Error(`add node as secondary failed after ${addSecondaryRetries} retries`)
}

async function addAsSecondaryOnce(primary) {
  const mongoCommand = `rs.add('${podIp}:${mongoPort}')`
  const result = await runMongoAdmin(true, primary, mongoCommand)
  if (result.includes(`Quorum check failed`)) {
    throw new Error('add node as secondary failed')
  }
}

async function createNewReplicaSet() {
  const mongoCommand = `rs.initiate({'_id': '${mongoReplicaSetName}', 'members': [{'_id': 0, 'host': '${podIp}'}]})`
  try {
    await runMongoAdmin(true, '127.0.0.1', mongoCommand)
  } catch (e) {
    // replica set configuration might popup ony now, so we recheck the replica set status
    console.log(`create replica failed: ${e.stack}`)
    if (!await reconfigureReplicaSetIfPossible()) {
      throw e
    }
  }
  await waitForMasterReady('127.0.0.1')
}

async function reconfigureReplicaSetIfPossible() {
  const result = await runMongoAdmin(true, '127.0.0.1', `rs.status()`)
  if (!result.includes('we are not a member of it')) {
    return false
  }

  await reconfigureReplicaSet()
  return true
}

async function reconfigureReplicaSet() {
  const mongoCommand = `\
    c=rs.conf(); \
    c.members.splice(1); \
    c.members[0].host='${podIp}'; \
    rs.reconfig(c, {force: true}) \
  `

  await runMongoAdmin(true, '127.0.0.1', mongoCommand)
}

async function findPrimaryNode() {
  const ips = await getPodsIps()
  for (let i = 0; i < ips.length; i++) {
    const ip = ips[i]
    if (await isPrimary(ip)) {
      return ip
    }
  }
}

async function isSecondaryPodLocated() {
  const ips = await getPodsIps()
  for (let i = 0; i < ips.length; i++) {
    const ip = ips[i]
    if (await isSecondary(ip)) {
      return true
    }
  }
  return false
}

async function isSecondary(ip) {
  const state = await runMongoAdmin(false, ip, 'rs.status().myState')
  return state === '2'
}

async function isPrimary(ip) {
  const state = await runMongoAdmin(false, ip, 'rs.status().myState')
  if (state !== '1') {
    return false
  }

  await waitForMasterReady(ip)
  return true
}

async function getPodsIps() {
  let args = `get pods -l configid=mongodb-container -o jsonpath='{range.items[*]}{.status.podIP} '`
  const stdout = await kubectl.runKubectl(true, args)
  const ips = []
  stdout.trim().split(' ').forEach(ip => {
    if (ip.trim().length > 0) {
      ips.push(ip)
    }
  })
  return ips
}

async function waitForPing() {
  return await runMongoAdminUntilResponse('127.0.0.1', `db.adminCommand('ping').ok`, '1')
}

async function waitForMasterReady(host) {
  return await runMongoAdminUntilResponse(host, `db.isMaster().ismaster`, 'true')
}

async function waitUntilSecondaryReady() {
  return await runMongoAdminUntilResponse('127.0.0.1', `rs.status().myState`, '2')
}

async function runMongoAdminUntilResponse(host, mongoCommand, expectedResult) {
  const startTime = new Date().getTime()
  while (true) {
    const result = await runMongoAdmin(false, host, mongoCommand)
    if (result === expectedResult) {
      break
    }

    const passedTime = new Date().getTime() - startTime
    if (passedTime > 120000) {
      throw new Error(`timeout waiting for good response from command: ${mongoCommand}\nLast response was: ${result}`)
    }
    await sleep(1000)
  }
  return {
    mongoResponse: true,
  }
}

async function runMongoAdmin(checkError, host, mongoCommand) {
  const commandLine = `mongo admin --host ${host} --quiet --eval "${mongoCommand}"`
  let result = await runCommand(commandLine, {checkError: checkError})
  result = result.trim()
  return result.trim()
}


The code uses the following helpers:


const {exec} = require('child_process')
async function runCommand(commandLine, options) {
  if (options === undefined) {
    options = {}
  }
  if (options.checkError === undefined) {
    options.checkError = true
  }
  if (options.waitForCompletion === undefined) {
    options.waitForCompletion = true
  }
  const promise = new Promise((resolve, reject) => {

    const execHandler = exec(commandLine, (err, stdout, stderr) => {
      let result = ''
      if (stdout) {
        result += stdout
      }
      if (stderr) {
        result += stderr
      }

      if (err) {
        if (options.checkError) {
          reject(err)
        } else {
          resolve(result)
        }
        return
      }

      resolve(result)
    })

    if (options.streamOutputCallback) {
      execHandler.stdout.on('data', (data) => {
        options.streamOutputCallback(data)
      })
    }
  })

  if (options.waitForCompletion) {
    return await promise
  }

  return {promise: promise}
}



async function sleep(time) {
  await new Promise((resolve) => {
    setTimeout(() => {
      resolve()
    }, time)
  })
}

Summary

I've successfully run MongoDB ReplicaSet on kubernetes.
The ReplicaSet has proven to recover from both partial and full restart of the pods.




No comments:

Post a Comment