Monday, April 15, 2024

Streaming Messages to a Go gRPC Server


 

In this post we will demonstrate streaming gRPC messages to a server using Go.


Create proto file

The proto file describes the gRPC service APIs. In this example we create a single API to send stream of persons from the client to the server.


persons.proto

syntax = "proto3";
option go_package = "my.example.com/com/grpctemplates";

service Persons {
rpc StreamPersons(stream Person) returns(ProcessedIndication){

}
}

message Person{
string name = 1;
int32 age = 2;
}

message ProcessedIndication{

}


Generate Go templates

To generate go sources using the proto file, we first install the required tools:

sudo apt install -y protobuf-compiler
protoc --version
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

Next we run the tools to generate the Go templates:

export PATH="$PATH:$(go env GOPATH)/bin"
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
persons.proto

rm -rf grpctemplates
mkdir grpctemplates
mv persons.pb.go grpctemplates/
mv persons_grpc.pb.go grpctemplates/

The Main

In our example we will run both the client and the server in the same process.

package main

import (
"grpcexample/personsclient"
"grpcexample/personsserver"
"time"
)

func main() {
go func() {
time.Sleep(time.Second)
personsclient.RunClient()
}()

personsserver.RunServer()
}


The Server

The server implements an API to get the stream of persons, prints them, and return a complete indication.

package personsserver

import (
"fmt"
"google.golang.org/grpc"
"grpcexample/grpctemplates"
"io"
"log"
"net"
)

type personsServer struct {
grpctemplates.UnimplementedPersonsServer
}

func (s *personsServer) StreamPersons(stream grpctemplates.Persons_StreamPersonsServer) error {
for {
person, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&grpctemplates.ProcessedIndication{})
}
if err != nil {
panic(err)
}

log.Printf("got person %v\n", person.Name)
}
}

func RunServer() {
log.Print("starting gRPC server")
port := 8080
listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
panic(err)
}

persons := personsServer{}
grpcServer := grpc.NewServer()
grpctemplates.RegisterPersonsServer(grpcServer, &persons)

err = grpcServer.Serve(listener)
if err != nil {
panic(err)
}
}

The Client

The client sends a stream of persons to the server, and waits for completion before closing the connection.

package personsclient

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpcexample/grpctemplates"
"log"
)

func RunClient() {
log.Printf("client sending data starting\n")

connection, err := grpc.Dial(
"localhost:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}

defer func() {
err = connection.Close()
if err != nil {
panic(err)
}
}()

client := grpctemplates.NewPersonsClient(connection)
stream, err := client.StreamPersons(context.Background())
if err != nil {
panic(err)
}

for i := range 10 {
person := grpctemplates.Person{
Name: fmt.Sprintf("person %v", i),
Age: int32(i),
}
err = stream.Send(&person)
if err != nil {
panic(err)
}
}
_, err = stream.CloseAndRecv()
if err != nil {
panic(err)
}
log.Printf("client sending data done\n")
}



No comments:

Post a Comment