GoLang Integration with Cilium and Hubble in a Kubernetes Cluster


In this post we will install Cilium and Hubble, and us a GO application to integrate with their API to monitor the traffic between pods, and to apply a new Cilium Network Policy.

Cilium and Hubble provide visibility and security for the network activity in a kubernetes cluster. Cilium, installed on each kubernetes node, utilizes eBPF for high performance enforcement of network policies, while Hubble relay connects to all of the cilium, and collects network traffic statistics.


To install cilium on a bare metal kubernetes, use the following:

curl -L --remote-name-all{,.sha256sum}
sha256sum --check cilium-linux-amd64.tar.gz.sha256sum
sudo sudo tar xzvfC cilium-linux-amd64.tar.gz /usr/local/bin
rm cilium-linux-amd64.tar.gz{,.sha256sum}
cilium install

Then, to install Hubble relay on the kubernetes cluster, use the following:

cilium hubble enable
cilium status --wait

To install the Hubble client, which communicates with the Hubble relay, use the following:

export HUBBLE_VERSION=$(curl -s
curl -L --remote-name-all$HUBBLE_VERSION/hubble-linux-amd64.tar.gz{,.sha256sum}
sha256sum --check hubble-linux-amd64.tar.gz.sha256sum
sudo tar xzvfC hubble-linux-amd64.tar.gz /usr/local/bin
rm hubble-linux-amd64.tar.gz{,.sha256sum}


Let's create a GO application to print the network traffic flows captured by Hubble. We start by initiating a gRPC connection to the Hubble relay.

package main

import (

func main() {
var grpcOptions []grpc.DialOption
grpcOptions = append(grpcOptions, grpc.WithBlock())
grpcOptions = append(grpcOptions, grpc.FailOnNonTempDialError(true))
grpcOptions = append(grpcOptions, grpc.WithInsecure())

grpcConnection, err := grpc.DialContext(context.Background(), "hubble-relay.kube-system.svc.cluster.local:80", grpcOptions...)
if err != nil {

Next we wrap the gRPC connection with cilium observer, and loop on the flows.

client := observer.NewObserverClient(grpcConnection)
request := observer.GetFlowsRequest{
Follow: true,
flows, err := client.GetFlows(context.Background(), &request)
if err != nil {

for {
response, err := flows.Recv()
if err != nil {
if err == io.EOF || err == context.Canceled {


In our case we will print only the ingress permitted flows.

func analyzeFlowsResponse(response *observer.GetFlowsResponse) {
switch response.GetResponseTypes().(type) {
case *observer.GetFlowsResponse_Flow:
capturedFlow := response.GetFlow()

if capturedFlow.Verdict != flow.Verdict_FORWARDED {

if capturedFlow.TrafficDirection != flow.TrafficDirection_INGRESS {

fmt.Printf("%+v", capturedFlow)

An example printed flow is the following:

time:{seconds:1635760734  nanos:916929131}  verdict:FORWARDED  ethernet:{source:"02:dd:e6:6c:a0:db"  destination:"7e:a9:d2:3f:0b:2c"}  IP:{source:""  destination:""  ipVersion:IPv4}  l4:{TCP:{source_port:51320  destination_port:4245  flags:{SYN:true}}}  source:{identity:1  labels:"reserved:host"}  destination:{ID:1010  identity:4240  namespace:"kube-system"  labels:"k8s:io.cilium.k8s.policy.cluster=kubernetes"  labels:"k8s:io.cilium.k8s.policy.serviceaccount=hubble-relay"  labels:"k8s:io.kubernetes.pod.namespace=kube-system"  labels:"k8s:k8s-app=hubble-relay"  pod_name:"hubble-relay-5f55dc4987-vh79m"}  Type:L3_L4  node_name:"alon-laptop"  event_type:{type:4}  traffic_direction:INGRESS  trace_observation_point:TO_ENDPOINT  is_reply:{}  interface:{index:36  name:"lxcfee2d5062d36"}  Summary:"TCP Flags: SYN"


To create a cilium policy, we start by connecting to the kubernetes API:

package main

import (
ciliumApiPolicy ""
ciliumApiClient ""
ciliumApiLabels ""
ciliumApiRules ""
k8sApiMachinery ""

func main() {
restConfig, err := rest.InClusterConfig()
if err != nil {

client, err := ciliumApiClient.NewForConfig(restConfig)
if err != nil {

Next we create the policy. In this case we limit access to a pod with label app=my-target only for pod with label app=my-source.

namespace := "default"
policyName := "my-policy"
sourceLabels := map[string]string{"app": "my-source"}
targetLabels := map[string]string{"app": "my-target"}
port := 80
policy := ciliumApiPolicy.CiliumNetworkPolicy{
TypeMeta: k8sApiMachinery.TypeMeta{
Kind: "CiliumNetworkPolicy",
APIVersion: "",
ObjectMeta: k8sApiMachinery.ObjectMeta{
Name: policyName,
Namespace: namespace,
Spec: &ciliumApiRules.Rule{
EndpointSelector: ciliumApiRules.EndpointSelector{
LabelSelector: &ciliumApiLabels.LabelSelector{
MatchLabels: targetLabels,
Ingress: []ciliumApiRules.IngressRule{
IngressCommonRule: ciliumApiRules.IngressCommonRule{
FromEndpoints: []ciliumApiRules.EndpointSelector{
LabelSelector: &ciliumApiLabels.LabelSelector{
MatchLabels: sourceLabels,
ToPorts: []ciliumApiRules.PortRule{
Ports: []ciliumApiRules.PortProtocol{
Port: fmt.Sprintf("%v", port),
Protocol: "TCP",

options := k8sApiMachinery.CreateOptions{}
_, err = client.CiliumNetworkPolicies(namespace).Create(context.Background(), &policy, options)
if err != nil {

Final Note

Cilium and Hubble provide a great method to enforce network traffic policies, while maintaining high performance of the network. We can use these tools manually, or as described in this post in an automatic manner.

