In this post we will review how to create a secure kafka connection from GO.
See also the post Kafka Producer and Consumer in GO.
When trying to connect to a secure Apache Kafka server, you will usually receive 2 files:
- client.keystore.jks
- client.trustsotre.jks
These files should be converted to PEM files.
Use the following script to create the PEM files:
- server.cer.pem
- client.cer.pem
- client.key.pem
keytool -importkeystore \
-srckeystore input/client.truststore.jks \
-destkeystore output/server.p12 \
-deststoretype PKCS12 \
-srcstorepass "jks-pass" \
-deststorepass "topsecret"
openssl pkcs12 -in output/server.p12 -nokeys -out output/server.cer.pem -password pass:topsecret
keytool -importkeystore \
-srckeystore input/client.keystore.jks \
-destkeystore output/client.p12 \
-deststoretype PKCS12 \
-srcstorepass "jks-pass" \
-deststorepass "topsecret"
openssl pkcs12 -in output/client.p12 -nokeys -out output/client.cer.pem -password pass:topsecret
openssl pkcs12 -in output/client.p12 -nodes -nocerts -out output/client.key.pem -password pass:topsecret
To use a secure connection from a GO client, use the following code:
package consumer
import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/Shopify/sarama"
"io/ioutil"
)
func connect() {
config := sarama.NewConfig()
config.Version = sarama.V1_1_1_0
config.Consumer.Return.Errors = true
tlsConfig := newTLSConfig()
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = "my-user"
config.Net.SASL.Password = "my-password"
syncProducer, err := sarama.NewSyncProducer([]string{"127.0.0.1:30010"}, nil)
if err != nil {
panic(err)
}
fmt.Printf("sync producer created: %v", syncProducer)
}
func newTLSConfig() *tls.Config {
tlsConfig := tls.Config{}
cert, err := tls.LoadX509KeyPair("client.cer.pem", "client.key.pem")
if err != nil {
panic(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
caCert, err := ioutil.ReadFile("server.cer.pem")
if err != nil {
panic(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.InsecureSkipVerify = true
tlsConfig.BuildNameToCertificate()
return &tlsConfig
}
No comments:
Post a Comment