You have implemented a product base on kubernetes and GO.
Great.
But what if you have an algorithm that has a heavy computation step?
Suppose you can split the computation step to run in parallel?
One method is to use a local thread pool, and send to each thread a part of the computation.
Would it solve the problem?
Well, yes, as long as the tasks can be completed in a timely manner within a single machine.
However, in case you need more than a single machine, local thread pool is not your solution.
To distributes the computation step tasks among multiple servers, you can use workers pods in kubernetes. Each pod consumes tasks of the computation step, and hence the work can be distributed among multiple machines.
So the basic idea is as follows:
In this post I will present an implementation of this RPC server using GO and RabbitMQ. For details of deploying Rabbit MQ on kubernetes, see this post.
The RPC server is base on a client, that is running on a central location. The client produces computational tasks requests that are sent to a requests queue. The RPC server consumers are reading the requests, processing them, and then return the results to the response queue. The central client awaits for all of the responses from the RPC server consumers. This is presented in the following diagram:
Notice that the RPC server consumer can run as a single pod, or as multiple pods to enable distribution of the work among multiple servers.
Example of Usage
The following code is an example of usage of the RPC client/server.It can be run using "server" as argument and using "client" as argument.
When started as client, it produces 2 tasks, and waits for 2 results on the results queue.
When started as server, it waits for any tasks, and simulate consume and processing to return a result per each task.
package main import ( "github.com/alonana/rpc/rpc" "github.com/streadway/amqp" "os" "time" ) func main() { rabbitConnection, err := amqp.Dial("amqp://user:pass@127.0.0.1:5672/") if err != nil { panic(err) } if os.Args[1] == "server" { runServer(rabbitConnection) } else { runClient(rabbitConnection) } } func runClient(connection *amqp.Connection) { c := rpc.CreateClient(connection) c.Start() c.Produce([]byte("my-first-request")) c.Produce([]byte("my-second-request")) responses := c.Wait() for i := 0; i < len(responses); i++ { response := string(responses[i]) println(response) } } func runServer(connection *amqp.Connection) { s := rpc.CreateServer(connection, consumer) s.Start() time.Sleep(time.Hour) } func consumer(input rpc.Bytes) rpc.Bytes { request := string(input) return []byte("I got the request " + request) }
The Consume Queue
Both the client and the server are consuming messages from a queue. The server is consuming the messages from the requests queue, and the client is consuming the messages from the responses queue. Hence a shared code is included to handle consuming of messages from a queue.
package rpc import ( "github.com/pkg/errors" "github.com/streadway/amqp" "sync" ) type Bytes []byte type QueueConsumer func(Bytes) type ConsumeQueue struct { messages <-chan amqp.Delivery channel *amqp.Channel closeChannel chan bool closeWaitGroup sync.WaitGroup consumer QueueConsumer consumed int queueName string } func createConsumeQueue(connection *amqp.Connection, queueName string, consumer QueueConsumer) *ConsumeQueue { q := ConsumeQueue{ consumer: consumer, closeChannel: make(chan bool), queueName: queueName, } channel, err := connection.Channel() if err != nil { panic(err) } err = channel.Qos(1, 0, false) if err != nil { panic(err) } q.channel = channel _, err = channel.QueueDeclare(queueName, false, false, false, false, nil) if err != nil { panic(err) } q.messages, err = q.channel.Consume(q.queueName, q.queueName, false, false, false, false, nil) if err != nil { panic(err) } q.closeWaitGroup.Add(1) go q.consumeLoop() return &q } func (q *ConsumeQueue) stop() error { err := q.channel.Cancel(q.queueName, false) if err != nil { return errors.Errorf("cancel consumer failed: %v", err) } q.closeChannel <- true q.closeWaitGroup.Wait() q.channel.Close() return nil } func (q *ConsumeQueue) consumeLoop() { for { select { case <-q.closeChannel: q.closeWaitGroup.Done() return case message := <-q.messages: q.consume(message) break } } } func (q *ConsumeQueue) consume(message amqp.Delivery) { q.consumed++ q.consumer(message.Body) err := message.Ack(false) if err != nil { panic(err) } }
The Server
The server will run forever, waiting for messages in the requests queue, running the consumer to process them, and return results to the responses queue.
package rpc import ( "github.com/streadway/amqp" ) type ServerConsumer func(Bytes) Bytes type Server struct { rabbitConnection *amqp.Connection rabbitChannel *amqp.Channel consumeQueue *ConsumeQueue consumer ServerConsumer } func CreateServer(rabbitConnection *amqp.Connection, consumer ServerConsumer) *Server { return &Server{ rabbitConnection: rabbitConnection, consumer: consumer, } } func (s *Server) Start() { var err error channel, err := s.rabbitConnection.Channel() if err != nil { panic(err) } err = channel.Qos(1, 0, false) if err != nil { panic(err) } s.rabbitChannel = channel s.consumeQueue = createConsumeQueue(s.rabbitConnection, queueNameRequests, s.consumerWrapper) } func (s *Server) consumerWrapper(data Bytes) { result := s.consumer(data) publishing := amqp.Publishing{ ContentType: "text/plain", Body: result, } err := s.rabbitChannel.Publish("", queueNameResponses, false, false, publishing) if err != nil { panic(err) } }
The Client
Lastly, the client sends the produces tasks to the requests queue, and wait until all responses are received into the responses queue.
package rpc import ( "github.com/streadway/amqp" "math/rand" "strconv" ) type Client struct { connection *amqp.Connection rabbitChannel *amqp.Channel consumeQueue *ConsumeQueue requests int channel chan Bytes } func CreateClient(connection *amqp.Connection) *Client { return &Client{ connection: connection, channel: make(chan Bytes), } } func (c *Client) Start() { var err error channel, err := c.connection.Channel() if err != nil { panic(err) } err = channel.Qos(1, 0, false) if err != nil { panic(err) } c.rabbitChannel = channel c.consumeQueue = createConsumeQueue(c.connection, queueNameResponses, c.consumerWrapper) } func (c *Client) Produce(request Bytes) { publishing := amqp.Publishing{ ContentType: "text/plain", Body: request, } c.requests++ err := c.rabbitChannel.Publish("", queueNameRequests, false, false, publishing) if err != nil { panic(err) } } func (c *Client) Wait() []Bytes { var bytesResponses []Bytes for i := 0; i < c.requests; i++ { response := <-c.channel bytesResponses = append(bytesResponses, response) } c.rabbitChannel.Close() err := c.consumeQueue.stop() if err != nil { panic(err) } return bytesResponses} func (c *Client) consumerWrapper(data Bytes) { c.channel <- data }
Final Notes
In this post we have presented a method to distribute work among multiple machines using GO and Rabbit MQ. This is only a basic implementation example. In case of need it should be expanded to handle errors on the client and on the server side.
In addition, each request could include a unique ID to ensure that we're only handling the current iteration request/response.
Liked this post? Leave a comment.
No comments:
Post a Comment