In this post we will demonstrate streaming gRPC messages to a server using Go.
Create proto file
The proto file describes the gRPC service APIs. In this example we create a single API to send stream of persons from the client to the server.
persons.proto
syntax = "proto3";
option go_package = "my.example.com/com/grpctemplates";
service Persons {
rpc StreamPersons(stream Person) returns(ProcessedIndication){
}
}
message Person{
string name = 1;
int32 age = 2;
}
message ProcessedIndication{
}
Generate Go templates
To generate go sources using the proto file, we first install the required tools:
sudo apt install -y protobuf-compiler
protoc --version
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
Next we run the tools to generate the Go templates:
export PATH="$PATH:$(go env GOPATH)/bin"
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
persons.proto
rm -rf grpctemplates
mkdir grpctemplates
mv persons.pb.go grpctemplates/
mv persons_grpc.pb.go grpctemplates/
The Main
In our example we will run both the client and the server in the same process.
package main
import (
"grpcexample/personsclient"
"grpcexample/personsserver"
"time"
)
func main() {
go func() {
time.Sleep(time.Second)
personsclient.RunClient()
}()
personsserver.RunServer()
}
The Server
The server implements an API to get the stream of persons, prints them, and return a complete indication.
package personsserver
import (
"fmt"
"google.golang.org/grpc"
"grpcexample/grpctemplates"
"io"
"log"
"net"
)
type personsServer struct {
grpctemplates.UnimplementedPersonsServer
}
func (s *personsServer) StreamPersons(stream grpctemplates.Persons_StreamPersonsServer) error {
for {
person, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&grpctemplates.ProcessedIndication{})
}
if err != nil {
panic(err)
}
log.Printf("got person %v\n", person.Name)
}
}
func RunServer() {
log.Print("starting gRPC server")
port := 8080
listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
panic(err)
}
persons := personsServer{}
grpcServer := grpc.NewServer()
grpctemplates.RegisterPersonsServer(grpcServer, &persons)
err = grpcServer.Serve(listener)
if err != nil {
panic(err)
}
}
The Client
The client sends a stream of persons to the server, and waits for completion before closing the connection.
package personsclient
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpcexample/grpctemplates"
"log"
)
func RunClient() {
log.Printf("client sending data starting\n")
connection, err := grpc.Dial(
"localhost:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer func() {
err = connection.Close()
if err != nil {
panic(err)
}
}()
client := grpctemplates.NewPersonsClient(connection)
stream, err := client.StreamPersons(context.Background())
if err != nil {
panic(err)
}
for i := range 10 {
person := grpctemplates.Person{
Name: fmt.Sprintf("person %v", i),
Age: int32(i),
}
err = stream.Send(&person)
if err != nil {
panic(err)
}
}
_, err = stream.CloseAndRecv()
if err != nil {
panic(err)
}
log.Printf("client sending data done\n")
}
No comments:
Post a Comment