Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Wednesday, August 6, 2025

ClickHouse API Wrapper and Stub in Go


 

In this post we will create a GO wrapper for ClickHouse API. We will also create a stub that can be used in the tests.


You can also use the related post Deploy ClickHouse on Kubernetes without Operator.


First we create the ClickHouse interface.


package clickhouse

type ClickhouseApi interface {
ExecuteStatement(
statement string,
args ...any,
)
FetchRows(
statement string,
results any,
args ...any,
)
}


Example for usage is:


  type AgentCall struct {
EventId string `ch:"EVENT_ID"`
PoId string `ch:"PO_ID"`
AgentName string `ch:"AGENT"`
SourceIp string `ch:"SOURCE_IP"`
RawRequest string `ch:"RAW_REQUEST"`
RawResponse string `ch:"RAW_RESPONSE"`
EventTime time.Time `ch:"EVENT_TIME"`
ReportOnly bool `ch:"REPORT_ONLY"`
IsVerbose bool `ch:"VERBOSE"`
}
const SelectByPoIdAndAgent = `
SELECT
EVENT_ID,
PO_ID,
AGENT,
SOURCE_IP,
EVENT_TIME,
REPORT_ONLY,
VERBOSE,
RAW_REQUEST,
RAW_RESPONSE
FROM AGENT_CALL
WHERE PO_ID = ?
AND AGENT = ?
`
var agentsCalls []AgentCall
clickHouse.FetchRows(SelectByPoIdAndAgent, &agentsCalls, poId, agentName)


The ClickHouse implementation is very strait forward, but notice the Ping before accepting the connection as valid, otherwise we will return the connection even if it is invalid.


package clickhouse

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/ClickHouse/clickhouse-go/v2"
)

type ClickhouseImpl struct {
connection driver.Conn
}

func ProduceClickhouseImpl() *ClickhouseImpl {
address := fmt.Sprintf("%v:%v", Config.ClickhouseHost, Config.ClickhousePort)
log.Info("clickhouse connection to %v@%v", Config.ClickhouseUser, address)
connection, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{address},
Auth: clickhouse.Auth{
Database: "default",
Username: Config.ClickhouseUser,
Password: Config.ClickhousePassword,
},
})
kiterr.RaiseIfError(err)

serverVersion, err := connection.ServerVersion()
log.Info("clickhouse server version: %v", serverVersion)

err = connection.Ping(context.Background())
kiterr.RaiseIfError(err)

return &ClickhouseImpl{
connection: connection,
}
}

func (c *ClickhouseImpl) ExecuteStatement(
statement string,
args ...any,
) {
err := c.connection.Exec(context.Background(), statement, args...)
kiterr.RaiseIfError(err)
}

func (c *ClickhouseImpl) FetchRows(
statement string,
results any,
args ...any,
) {
err := c.connection.Select(context.Background(), results, statement, args...)
kiterr.RaiseIfError(err)
}


The ClickHouse stub is a bit more complex since we need to play with reflections.


package clickhouse

import (
"fmt"
"reflect"
)

type StubFetcher func(
statement string,
args ...any,
) []interface{}

type ClickhouseStub struct {
StubFetcher StubFetcher
}

func ProduceClickhouseStub(
stubFetcher StubFetcher,
) *ClickhouseStub {
return &ClickhouseStub{
StubFetcher: stubFetcher,
}
}

func (c *ClickhouseStub) ExecuteStatement(
statement string,
args ...any,
) {

}

func (c *ClickhouseStub) FetchRows(
statement string,
dest any,
args ...any,
) {
results := c.StubFetcher(statement, args...)
value := reflect.ValueOf(dest)
if value.Kind() != reflect.Ptr {
kiterr.RaiseIfError(fmt.Errorf("destination must be a pointer"))
}
if value.IsNil() {
kiterr.RaiseIfError(fmt.Errorf("destination must not be nil"))
}
direct := reflect.Indirect(value)
if direct.Kind() != reflect.Slice {
kiterr.RaiseIfError(fmt.Errorf("destination must be a slice"))
}

for _, result := range results {
direct.Set(reflect.Append(direct, reflect.ValueOf(result)))
}
}






No comments:

Post a Comment