In this post we present an method to run DDL for table creation and table drop regardless the fact whether we use a clickhouse cluster or a standalone clickhouse.
Clickhouse standalone and clickhouse cluster differ on the statement to create and drop a table.
Below are examples
Table Creation for Standalone
CREATE TABLE IF NOT EXISTS events
(
id UInt64,
user_id UInt32,
event_type String,
created_at DateTime
)
ENGINE = MergeTree()
ORDER BY (id)
Table Creation for Cluster
Here we need to create both the local table that we exist on each of the cluster nodes, and the distributed table that enables fetching from any cluster node.
CREATE TABLE IF NOT EXISTS events_LOCAL on CLUSTER my_cluster
(
id UInt64,
user_id UInt32,
event_type String,
created_at DateTime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{table}', '{replica}')
ORDER BY (id)
;
CREATE TABLE IF NOT EXISTS events on CLUSTER my_cluster
(
id UInt64,
user_id UInt32,
event_type String,
created_at DateTime
)
ENGINE = Distributed(
my_cluster,
default,
events_LOCAL,
1
)
Table Drop for Standalone
DROP TABLE IF EXISTS events
Table Drop for Cluster
Notice that not only we need to drop the local and distributed tables, but we also need to remove the replica from clickhouse keeper.
DROP TABLE IF EXISTS events on cluster my_cluster
;
DROP TABLE IF EXISTS events_LOCAL on cluster my_cluster
;
SYSTEM DROP REPLICA 'clickhouseserver-statefulset-0'
FROM ZKPATH '/clickhouse/tables/01/events_LOCAL'
;
SYSTEM DROP REPLICA 'clickhouseserver-statefulset-1'
FROM ZKPATH '/clickhouse/tables/01/events_LOCAL'
Abstract DDL Wrapper
func GetLocalTableName(
tableName string,
) string {
if clickhouse.Config.ClickhouseConnectionToCluster {
return tableName + "_LOCAL"
}
return tableName
}
func convertToClusterEngine(
engineAndMore string,
) string {
re := regexp.MustCompile(`(\w*MergeTree)\s*\(([^)]*)\)`)
located := re.FindStringSubmatch(engineAndMore)
if located == nil {
kiterr.RaiseError("unable to locate engine in %v", engineAndMore)
}
engine := located[1]
parameters := located[2]
if parameters != "" {
parameters = ", " + parameters
}
replacement := fmt.Sprintf(
"Replicated%s('/clickhouse/tables/{shard}/{table}', '{replica}'%s)",
engine,
parameters,
)
return re.ReplaceAllString(engineAndMore, replacement)
}
func CreateTable(
clickHouse clickhouse.ClickhouseApi,
tableName string,
columns string,
engineAndMore string,
) {
engineAndMore = strings.TrimSpace(engineAndMore)
if clickhouse.Config.ClickhouseConnectionToCluster {
local := fmt.Sprintf(
`
CREATE TABLE IF NOT EXISTS %v on CLUSTER %v
%v
ENGINE = %v
`,
GetLocalTableName(tableName),
clickhouse.Config.ClickhouseClusterName,
columns,
convertToClusterEngine(engineAndMore),
)
clickHouse.ExecuteStatement(local)
distributed := fmt.Sprintf(
`
CREATE TABLE IF NOT EXISTS %v on CLUSTER %v
%v
ENGINE = Distributed(
%v,
default,
%v,
1
)
`,
tableName,
clickhouse.Config.ClickhouseClusterName,
columns,
clickhouse.Config.ClickhouseClusterName,
GetLocalTableName(tableName),
)
clickHouse.ExecuteStatement(distributed)
} else {
statement := fmt.Sprintf(
`
CREATE TABLE IF NOT EXISTS %v
%v
ENGINE = %v
`,
tableName,
columns,
engineAndMore,
)
clickHouse.ExecuteStatement(statement)
}
}
func DropTable(
clickHouse clickhouse.ClickhouseApi,
tableName string,
) {
if clickhouse.Config.ClickhouseConnectionToCluster {
distributed := fmt.Sprintf(
`DROP TABLE IF EXISTS %v on cluster %v`,
tableName,
clickhouse.Config.ClickhouseClusterName,
)
clickHouse.ExecuteStatementOnAllNodes(distributed)
local := fmt.Sprintf(
`DROP TABLE IF EXISTS %v on cluster %v`,
GetLocalTableName(tableName),
clickhouse.Config.ClickhouseClusterName,
)
clickHouse.ExecuteStatementOnAllNodes(local)
dropReplica(clickHouse, tableName, 0)
dropReplica(clickHouse, tableName, 1)
} else {
clickHouse.ExecuteStatement(`DROP TABLE IF EXISTS ` + tableName)
}
}
func dropReplica(
clickHouse clickhouse.ClickhouseApi,
tableName string,
nodeIndex int,
) {
err := kiterr.RunSafe(func() {
keeper := fmt.Sprintf(
`SYSTEM DROP REPLICA 'democlickhouseserver-statefulset-%v' FROM ZKPATH '/clickhouse/tables/01/%v'`,
nodeIndex,
GetLocalTableName(tableName),
)
clickHouse.ExecuteStatementOnAllNodes(keeper)
})
//ignoring error
kiterr.LogWarnIfError(err)
}
tableName := "events"
columns := `
(
id UInt64,
user_id UInt32,
event_type String,
created_at DateTime
)
`
engine := "MergeTree() ORDER BY (id)"
CreateTable(
clickhouseApi,
tableName,
columns,
engine,
)
DropTable(
clickhouseApi,
tableName,
)