Thursday, March 26, 2020

RPC Server using GO and RabbitMQ


You have implemented a product base on kubernetes and GO.
Great.

But what if you have an algorithm that has a heavy computation step?
Suppose you can split the computation step to run in parallel?

One method is to use a local thread pool, and send to each thread a part of the computation.
Would it solve the problem?
Well, yes, as long as the tasks can be completed in a timely manner within a single machine.
However, in case you need more than a single machine, local thread pool is not your solution.

To distributes the computation step tasks among multiple servers, you can use workers pods in kubernetes. Each pod consumes tasks of the computation step, and hence the work can be distributed among multiple machines.

So the basic idea is as follows:




In this post I will present an implementation of this RPC server using GO and RabbitMQ. For details of deploying Rabbit MQ on kubernetes, see this post.

The RPC server is base on a client, that is running on a central location. The client produces computational tasks requests that are sent to a requests queue. The RPC server consumers are reading the requests, processing them, and then return the results to the response queue. The central client awaits for all of the responses from the RPC server consumers. This is presented in the following diagram:





Notice that the RPC server consumer can run as a single pod, or as multiple pods to enable distribution of the work among multiple servers.


Example of Usage

The following code is an example of usage of the RPC client/server.
It can be run using "server" as argument and using "client" as argument.

When started as client, it produces 2 tasks, and waits for 2 results on the results queue.

When started as server, it waits for any tasks, and simulate consume and processing to return a result per each task.


package main
import (
   "github.com/alonana/rpc/rpc"
   "github.com/streadway/amqp"
   "os"
   "time"
)

func main() {
   rabbitConnection, err := amqp.Dial("amqp://user:pass@127.0.0.1:5672/")
   if err != nil {
      panic(err)
   }

   if os.Args[1] == "server" {
      runServer(rabbitConnection)
   } else {
      runClient(rabbitConnection)
   }
}

func runClient(connection *amqp.Connection) {
   c := rpc.CreateClient(connection)
   c.Start()
   c.Produce([]byte("my-first-request"))
   c.Produce([]byte("my-second-request"))
   responses := c.Wait()
   for i := 0; i < len(responses); i++ {
      response := string(responses[i])
      println(response)
   }
}

func runServer(connection *amqp.Connection) {
   s := rpc.CreateServer(connection, consumer)
   s.Start()
   time.Sleep(time.Hour)
}

func consumer(input rpc.Bytes) rpc.Bytes {
   request := string(input)
   return []byte("I got the request " + request)
}


The Consume Queue


Both the client and the server are consuming messages from a queue. The server is consuming the messages from the requests queue, and the client is consuming the messages from the responses queue. Hence a shared code is included to handle consuming of messages from a queue.



package rpc
import (
   "github.com/pkg/errors"
   "github.com/streadway/amqp"
   "sync"
)

type Bytes []byte
type QueueConsumer func(Bytes)

type ConsumeQueue struct {
   messages       <-chan amqp.Delivery
   channel        *amqp.Channel
   closeChannel   chan bool
   closeWaitGroup sync.WaitGroup
   consumer       QueueConsumer
   consumed       int
   queueName      string
}

func createConsumeQueue(connection *amqp.Connection, queueName string, consumer QueueConsumer) *ConsumeQueue {
   q := ConsumeQueue{
      consumer:     consumer,
      closeChannel: make(chan bool),
      queueName:    queueName,
   }

   channel, err := connection.Channel()
   if err != nil {
      panic(err)
   }

   err = channel.Qos(1, 0, false)
   if err != nil {
      panic(err)
   }
   q.channel = channel
   _, err = channel.QueueDeclare(queueName, false, false, false, false, nil)
   if err != nil {
      panic(err)
   }

   q.messages, err = q.channel.Consume(q.queueName, q.queueName, false, false, false, false, nil)
   if err != nil {
      panic(err)
   }

   q.closeWaitGroup.Add(1)
   go q.consumeLoop()
   return &q
}

func (q *ConsumeQueue) stop() error {
   err := q.channel.Cancel(q.queueName, false)
   if err != nil {
      return errors.Errorf("cancel consumer failed: %v", err)
   }
   q.closeChannel <- true   q.closeWaitGroup.Wait()

   q.channel.Close()
   return nil
}

func (q *ConsumeQueue) consumeLoop() {
   for {
      select {
      case <-q.closeChannel:
         q.closeWaitGroup.Done()
         return      case message := <-q.messages:
         q.consume(message)
         break      
       }
   }
}

func (q *ConsumeQueue) consume(message amqp.Delivery) {
   q.consumed++
   q.consumer(message.Body)

   err := message.Ack(false)
   if err != nil {
      panic(err)
   }
}


The Server


The server will run forever, waiting for messages in the requests queue, running the consumer to process them, and return results to the responses queue.


package rpc
import (
   "github.com/streadway/amqp"
)

type ServerConsumer func(Bytes) Bytes
type Server struct {
   rabbitConnection *amqp.Connection
   rabbitChannel    *amqp.Channel
   consumeQueue     *ConsumeQueue
   consumer         ServerConsumer
}

func CreateServer(rabbitConnection *amqp.Connection, consumer ServerConsumer) *Server {
   return &Server{
      rabbitConnection: rabbitConnection,
      consumer:         consumer,
   }
}

func (s *Server) Start() {
   var err error   channel, err := s.rabbitConnection.Channel()
   if err != nil {
      panic(err)
   }

   err = channel.Qos(1, 0, false)
   if err != nil {
      panic(err)
   }
   s.rabbitChannel = channel
   s.consumeQueue = createConsumeQueue(s.rabbitConnection, queueNameRequests, s.consumerWrapper)
}

func (s *Server) consumerWrapper(data Bytes) {
   result := s.consumer(data)

   publishing := amqp.Publishing{
      ContentType: "text/plain",
      Body:        result,
   }

   err := s.rabbitChannel.Publish("", queueNameResponses, false, false, publishing)
   if err != nil {
      panic(err)
   }
}


The Client


Lastly, the client sends the produces tasks to the requests queue, and wait until all responses are received into the responses queue.


package rpc
import (
   "github.com/streadway/amqp"
   "math/rand"
   "strconv"
)

type Client struct {
   connection    *amqp.Connection
   rabbitChannel *amqp.Channel
   consumeQueue  *ConsumeQueue
   requests      int
   channel       chan Bytes
}

func CreateClient(connection *amqp.Connection) *Client {

   return &Client{
      connection: connection,
      channel:    make(chan Bytes),
   }
}

func (c *Client) Start() {
   var err error   channel, err := c.connection.Channel()
   if err != nil {
      panic(err)
   }

   err = channel.Qos(1, 0, false)
   if err != nil {
      panic(err)
   }
   c.rabbitChannel = channel
   c.consumeQueue = createConsumeQueue(c.connection, queueNameResponses, c.consumerWrapper)
}

func (c *Client) Produce(request Bytes) {
   publishing := amqp.Publishing{
      ContentType: "text/plain",
      Body:        request,
   }

   c.requests++
   err := c.rabbitChannel.Publish("", queueNameRequests, false, false, publishing)
   if err != nil {
      panic(err)
   }
}

func (c *Client) Wait() []Bytes {
   var bytesResponses []Bytes   for i := 0; i < c.requests; i++ {
      response := <-c.channel      bytesResponses = append(bytesResponses, response)
   }

   c.rabbitChannel.Close()

   err := c.consumeQueue.stop()
   if err != nil {
      panic(err)
   }

   return bytesResponses}

func (c *Client) consumerWrapper(data Bytes) {
   c.channel <- data
}


Final Notes


In this post we have presented a method to distribute work among multiple machines using GO and Rabbit MQ. This is only a basic implementation example. In case of need it should be expanded to handle errors on the client and on the server side.
In addition, each request could include a unique ID to ensure that we're only handling the current iteration request/response.

Liked this post? Leave a comment.


No comments:

Post a Comment