Monday, April 15, 2024

Streaming Messages to a Go gRPC Server


 

In this post we will demonstrate streaming gRPC messages to a server using Go.


Create proto file

The proto file describes the gRPC service APIs. In this example we create a single API to send stream of persons from the client to the server.


persons.proto

syntax = "proto3";
option go_package = "my.example.com/com/grpctemplates";

service Persons {
rpc StreamPersons(stream Person) returns(ProcessedIndication){

}
}

message Person{
string name = 1;
int32 age = 2;
}

message ProcessedIndication{

}


Generate Go templates

To generate go sources using the proto file, we first install the required tools:

sudo apt install -y protobuf-compiler
protoc --version
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

Next we run the tools to generate the Go templates:

export PATH="$PATH:$(go env GOPATH)/bin"
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
persons.proto

rm -rf grpctemplates
mkdir grpctemplates
mv persons.pb.go grpctemplates/
mv persons_grpc.pb.go grpctemplates/

The Main

In our example we will run both the client and the server in the same process.

package main

import (
"grpcexample/personsclient"
"grpcexample/personsserver"
"time"
)

func main() {
go func() {
time.Sleep(time.Second)
personsclient.RunClient()
}()

personsserver.RunServer()
}


The Server

The server implements an API to get the stream of persons, prints them, and return a complete indication.

package personsserver

import (
"fmt"
"google.golang.org/grpc"
"grpcexample/grpctemplates"
"io"
"log"
"net"
)

type personsServer struct {
grpctemplates.UnimplementedPersonsServer
}

func (s *personsServer) StreamPersons(stream grpctemplates.Persons_StreamPersonsServer) error {
for {
person, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&grpctemplates.ProcessedIndication{})
}
if err != nil {
panic(err)
}

log.Printf("got person %v\n", person.Name)
}
}

func RunServer() {
log.Print("starting gRPC server")
port := 8080
listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
panic(err)
}

persons := personsServer{}
grpcServer := grpc.NewServer()
grpctemplates.RegisterPersonsServer(grpcServer, &persons)

err = grpcServer.Serve(listener)
if err != nil {
panic(err)
}
}

The Client

The client sends a stream of persons to the server, and waits for completion before closing the connection.

package personsclient

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpcexample/grpctemplates"
"log"
)

func RunClient() {
log.Printf("client sending data starting\n")

connection, err := grpc.Dial(
"localhost:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}

defer func() {
err = connection.Close()
if err != nil {
panic(err)
}
}()

client := grpctemplates.NewPersonsClient(connection)
stream, err := client.StreamPersons(context.Background())
if err != nil {
panic(err)
}

for i := range 10 {
person := grpctemplates.Person{
Name: fmt.Sprintf("person %v", i),
Age: int32(i),
}
err = stream.Send(&person)
if err != nil {
panic(err)
}
}
_, err = stream.CloseAndRecv()
if err != nil {
panic(err)
}
log.Printf("client sending data done\n")
}



Monday, April 8, 2024

GraphQL Usage in Go


 


This post includes an example of GraphQL usage in Go. The actual GraphQL implementation is using the graphql-go library.


Some insights I got from this:

  • Implementing code in GraphQL is much more complicated than using REST. We need to re-write the specifications for each usage, and repeat the fields names. It is not build in to the language like GO and simple JSON marshaling.
  • To use mutation drop the root parenthesis in the query. OMG I've spent an hour trying to understand why it is not working.
  • I believe the advantage of GraphQL over REST is dynamic fields selection. Don't know many applications where this matters.
  • Authorization is not full built into GraphQL. I guess this is why we have so many security issues in applications using GraphQL.



package main

import (
"encoding/json"
"fmt"
"github.com/graphql-go/graphql"
)

type dbActor struct {
Name string
BirthYear int
}

var dbActors = []*dbActor{
{
Name: "John Travolta",
BirthYear: 1954,
},
{
Name: "Robert Redford",
BirthYear: 1936,
},
}

func main() {
schema := createSchema()
var query string

query = `
{
list{
name
birthYear
}
}
`
runQuery(schema, query)

query = `
{
actor(name: "Robert Redford"){
birthYear
}
}
`
runQuery(schema, query)

query = `
mutation {
create(name:"Meryl Streep",birthYear:1949){
name
}
}
`
runQuery(schema, query)

query = `
{
list{
name
birthYear
}
}
`
runQuery(schema, query)

}

func runQuery(schema graphql.Schema, query string) {
params := graphql.Params{
Schema: schema,
RequestString: query,
}

result := graphql.Do(params)
if len(result.Errors) > 0 {
panic(fmt.Errorf("failed to execute graphql operation, errors: %+v", result.Errors))
}

jsonData, err := json.MarshalIndent(result, "", " ")
if err != nil {
panic(err)
}

fmt.Printf("\n%s\n", jsonData)
}

func createSchema() graphql.Schema {
schemaActor := graphql.NewObject(graphql.ObjectConfig{
Name: "actor",
Fields: graphql.Fields{
"name": &graphql.Field{
Type: graphql.String,
},
"birthYear": &graphql.Field{
Type: graphql.Int,
},
},
})

schemaQuery := graphql.NewObject(graphql.ObjectConfig{
Name: "QueryRoot",
Fields: graphql.Fields{
"list": &graphql.Field{
Type: graphql.NewList(schemaActor),
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
return dbActors, nil
},
},
"actor": &graphql.Field{
Type: schemaActor,
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.String,
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
name, ok := params.Args["name"].(string)
if !ok {
panic("name argument invalid")
}
for _, actor := range dbActors {
if actor.Name == name {
return actor, nil
}
}
return nil, nil
},
},
},
})

schemaMutation := graphql.NewObject(graphql.ObjectConfig{
Name: "Mutation",
Fields: graphql.Fields{
"create": &graphql.Field{
Type: schemaActor,
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"birthYear": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.Int),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
name, ok := params.Args["name"].(string)
if !ok {
panic("name argument invalid")
}
birthYear, ok := params.Args["birthYear"].(int)
if !ok {
panic("birthYear argument invalid")
}
actor := dbActor{
Name: name,
BirthYear: birthYear,
}
dbActors = append(dbActors, &actor)
return actor, nil
},
},
},
})

schemaConfig := graphql.SchemaConfig{
Query: schemaQuery,
Mutation: schemaMutation,
}
schema, err := graphql.NewSchema(schemaConfig)
if err != nil {
panic(err)
}
return schema
}





Monday, April 1, 2024

AWS Application Load Balancer Vs. AWS Network Load Balancer



In this post we will review the AWS load balancers types, and investigate the tasks each load balancer is suitable to perform. We will review this as a load balancer serving clients that access services in a kubernetes cluster.


ALB: AWS Application Load Balancer

An ALB is a communication layer 7 creature.

An ALB distributes traffic among multiple kubernetes cluster services' pods. A single ALB can serve multiple services, and by traffic metadata such as host name and path the ALB selects the target service. Once a service is located, the ALB can route the HTTP request directly to the related pod, while distributing the requests between all the pods that are READY for service. Notice that the ALB's health check differs from the health and ready check configured by the pod, and in case it is not the default access to the slash, special ALB configuration should be made.

ALB handles both HTTP 1.x and HTTP 2.x requests, which means that it can also handle gRPC protocol.

ALB can also supply additional layer 7 functions such as authentication and WAF.

NLB: AWS Network Load Balancer

A NLB is a communication layer 4 creature.

A NLB distributes incoming UDP/TCP connections among a single kubernetes cluster service pods. A single NLB can serve only single service.
The incoming connection is proxy to one of the READY service pods. Once the NLB established the TCP connection to the target pod, it will stay connected as long as the client and the pod keep the connection active. The NLB does not inspect the data transferred in the TCP connection, and hence cannot distribute requests to another pod.

Which Should I use?

Let's start with the price: ALB cost is 0.008$ per LCU, while  NLB cost is 0.006$ per LCU.

So ALB costs more, but it has a major advantage: It distributes per request, and not per TCP connection. This is critical in case we have multiple pods, and long lasting client connection.

ALB IP address must be resolved through DNS, while NLB uses a static IP address, hence if we cannot use DNS, we must use NLB.


Hence to choose the appropriate load balancer type:

  • If we must use static IP address - use NLB
  • If we use non HTTP protocol - use NLB
  • If we have single service with single pod - use NLB
  • If we have single service with short lasting client sockets - use NLB
  • For other cases - use ALB






Monday, March 25, 2024

Using KEDA with Promethues scaler


 

In this post we will show a simple usage of KEDA with prometheus scaler to set the replicas number of a deployment.


KEDA is an event driven autoscaler, it wraps up the kubernetes horizontal pod autoscaler and simplify autoscaling while enabling scaler by a huge collection of scale metrics sources.


Deploy KEDA

Deploying KEDA is done by using helm chart:

helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda --namespace keda --create-namespace

Deploy a Scaledobject

A scaledobject is a configuration of the required auto scaling. We deploy the following scaledobject:


apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: my-scaledobject
namespace: default
spec:
minReplicaCount: 1
maxReplicaCount: 5
pollingInterval: 5
cooldownPeriod: 30
scaleTargetRef:
name: my-deployment
kind: Deployment
apiVersion: apps/v1
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus-service.default:80
threshold: '100'
query: sum (rate(handled_items[1m]))


We specify the min and max replicas, as well as the polling interval, and the cooldown internal.

The scaled object target is a deployment.

The metric source is prometheus, whose service address must be supplied. The scale is done using a promethues metric value. In this example, we set the threshold to 100, which means that above an average of 100 per pod, KEDA will scale up the replicas amount.





Monday, March 18, 2024

Creating graphs using Graphvis dot file

 

Graphvis is an open source graph visualization software. One of the covered aspects is a standard for DOT file, which describes a graph. These graphs can be later visualized using related software and also in online visualization sites such as Graphvis Online.

In this post we will explore the dot file various capabilities.

For more amazing graphs, see the gallery.

A simple undirected graph


graph test {
a -- b -- c;
}




A simple directed graph

digraph test {
a -> b -> c;
}


Multiple graphs with styles


  • To have a sub-graph in its own box: use the prefix "cluster" in the graph name.
  • Edge connections:
    • node to node edges
    • cluster to cluster edges
  • Use "node" to apply attributes to all nodes in the scope

digraph {

compound=true;

subgraph cluster_a{
label="A";
node [style=filled fillcolor="#ff00ff" fontcolor="white" shape=box];
a1[shape=star];
a1 -> {a2 a3};
}

subgraph cluster_b{
label="B";
b1 -> b2;
}

a1 -> b1[label="nodes edge"];
a2 -> b2[label="clusters edge" ltail="cluster_a" lhead="cluster_b"];
}




Record node

  • Enables multiple segments in a node.
  • We can tag a segment, and use it later in an edge.
digraph {

subgraph x{
node[shape="record"];
a[label="{line 1|<x>line2}"];
b[label="{line1|{lin2_col1|<y>line2_col2|line2_col3}}"];
a:x->b:y;
}

}





Use HTML in a label

We can use HTML for a label, but only in a table format.

digraph {

staging [
label=<<table border="0" cellborder="1" cellspacing="0" cellpadding="4">
<tr> <td> <b>important</b></td> </tr>
<tr> <td> to have fun</td> </tr>
</table>>
shape=plain
]

}








Monday, March 11, 2024

Dynamically Allocate and Delete PersistentVolumeClaim for a CronJob


 


In this post we will review the steps to dynamically create and delete a PersistentVolumeClaim for a CronJob.

A CronJob might require a large amount of temporary storage, and we don't want to keep the PersistenceVolumeClaim active while the job is not running, since the cost might be high. For example, assumeing we have a CronJob running once in a week for 3 hours, and required 1TB disk space for calculations. The cost of leaving such a disk active for an entire week is very high, hence we should dynamically allocate and remove the disk.

Kubernetes does not supply out of the box mechanism to handle this, hence we can do it ourselves. We handle this by 3 CronJobs:

1. The allocate CronJob which create the PVC

2. The actual computation CronJob

3. The cleanup CronJob


The Allocate CronJob

This CronJob creates the PVC before the computation job is starting. It should use a schedule that run it just a few minutes before the computation job. The following are the kubernetes entities to run the allocate CronJob. Notice that we must also provide permissions for handling the PVC.

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: allocate-role
rules:
- apiGroups: [ "" ]
resources: [ "persistentvolumes" ]
verbs: ["create","list","delete","get","patch","watch"]

- apiGroups: [ "" ]
resources: [ "persistentvolumeclaims" ]
verbs: ["create","list","delete","get","patch","watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: allocate-role-binding
subjects:
- kind: ServiceAccount
name: allocate-service-account
namespace: default
roleRef:
kind: ClusterRole
name: allocate-role
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: allocate-service-account
namespace: default
---
apiVersion: v1
kind: ConfigMap
metadata:
name: allocate-config
data:
pvc.yaml: |-


apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: my-pvc
labels:
type: local
spec:
storageClassName: "gp2"
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1000Gi

---
kind: CronJob
metadata:
name: allocate-cronjob
spec:
schedule: "0 0 * * *"
startingDeadlineSeconds: 36000
concurrencyPolicy: Replace
timeZone: "Etc/UTC"
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
serviceAccountName: allocate-service-account
restartPolicy: Never
containers:
- name: cleanup
image: repo/allocate/dev:latest
imagePullPolicy: IfNotPresent
env:
- name: PVC_NAME
value: my-pvc
- name: NAMESPACE
value: default
volumeMounts:
- name: config
mountPath: /config
volumes:
- name: config
configMap:
name: allocate-config

The allocate image is a simple script that runs kubectl to create the PVC:

#!/usr/bin/env bash
set -e
set -x

echo "prepare starting"

kubectl delete pvc ${PV_NAME} --namespace ${NAMESPACE} --ignore-not-found=true
kubectl apply -f /config/pvc.yaml

echo "prepare done"




The Cleanup CronJob


The Cleanup CronJob runs after the computation job completion and deletes the PVC. This includes the following kubernetes entities:


---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: cleanup-role
rules:
- apiGroups: [ "" ]
resources: [ "persistentvolumeclaims" ]
verbs: ["create","list","delete","get","patch","watch"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: cleanup-role-binding
subjects:
- kind: ServiceAccount
name: cleanup-service-account
namespace: default
roleRef:
kind: ClusterRole
name: cleanup-role
apiGroup: rbac.authorization.k8s.io

---
apiVersion: v1
kind: ServiceAccount
metadata:
name: cleanup-service-account
namespace: default

---
apiVersion: batch/v1
kind: CronJob
metadata:
name: cleanup-cronjob
spec:
schedule: "0 4 * * *"
startingDeadlineSeconds: 36000
concurrencyPolicy: Replace
timeZone: "Etc/UTC"
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
serviceAccountName: cleanup-service-account
restartPolicy: Never
containers:
- name: cleanup
image: repo/cleanup/dev:latest
imagePullPolicy: IfNotPresent
env:
- name: PV_NAME
value: attackrepo-pv-0
- name: NAMESPACE
value: default


The cleaner image runs the following script:


#!/usr/bin/env bash
set -e
set -x

echo "cleanup starting"

kubectl delete pvc ${PV_NAME} --namespace ${NAMESPACE} --ignore-not-found=true


echo "cleanup done"





Monday, February 26, 2024

Dall-E3 Advanced Prompts Guidelines

 



Recently, OpenAI had recently released Dall-E3, a great improvement over the Dall-E2 image generator. You may have used it and generated some images using simple text prompts. The real "art" in this game is understanding prompt and how to design it to get accurate results.


The image at the top of this blog was generated using the prompt:


a photo of Salvador Dali drawing on a screen of a laptop


While it is a nice image, I had a different idea in mind when supplying the prompt. How can we create a better prompt that will generate the image we've had in mind?

The general guideline is to split the prompt to multiple statements, each statement adding more requirement to the image generator, for example:


Main object: The main object is the artist Salvador Dali. Behavior: Salvador Dali is painting a cute puppy on a laptop. Environment: The artist is located in an artist studio room. Items: There will be various painting related items spread in a mess around in the room. These items include: colors palette, colors buckets, various brushes in different size. Add additional painting related tools. Colors: Use brownish and yellowish background colors


And the result is:




We can see a great improvement, though still not perfect, the image generator is starting to get the idea we have in mind. Now that we have the right concept, we can add more statements, or update the existing statements as if we're programming a multiple layers application.

Let's give it another try:


Main object: The main object is the aritst Salvador Dali. Behavior: Salvador Dali stands and draws a cute puppy on a laptop. Environment: The artist is located in an artist studio room. Items: There will be various painting related items spread in a mess around in the room. These items include: colors palette, colors buckets, various brushes in different size. Add additional painting related tools. Colors: Use brownish and yellowish background colors Point of View: The point of view is diagonal for the top right side toward the bottom left direction.


And this time, we're almost there:




Now a final touch:


Main object: The main object is the artist Salvador Dali. Behavior: Salvador Dali stands and draws a drawing of cute puppy on a big laptop. Body language: Salvador Dali's face expression is very busy. Environment: The artist is located in an artist studio room. Items: There will be various painting related items spread in a mess around in the room. These items include: colors palette, colors buckets, various brushes in different size. Add additional painting related tools. Colors: Use brownish and yellowish background colors Point of View: The point of view is diagonal for the top right side toward the bottom left direction. We can see both Salvador Dali's face and the laptop