跳到主要内容
Ubuntu Datasophon 1.2.1 集成 ClickHouse 组件解决方案 | 极客日志
Java java
Ubuntu Datasophon 1.2.1 集成 ClickHouse 组件解决方案 综述由AI生成 在 Ubuntu 环境下为 Datasophon 1.2.1 平台集成 ClickHouse 组件的步骤。内容包括下载对应版本 ClickHouse 包、构造 tar.gz 安装包、修改 service_ddl.json 配置文件定义组件角色与启动策略、编写 Java 策略代码实现 ZooKeeper 目录创建的分布式锁控制。最后通过 SQL 查询验证集群分片与副本配置,完成了 OLAP 数据库的二次开发部署。
路由之心 发布于 2026/3/25 更新于 2026/5/10 18 浏览背景
安装完 HIVE 之后,就剩下 spark,flink,clickhouse。安装 spark,flink 比较简单顺利。准备安装 OLAP 数据库 clickhouse,发现 datasophon 1.2.1 组件没有,没办法只能添加一个。
问题
如何添加 clickhouse 组件呢?首先下载跟操作系统匹配的 clickhouse 版本。其次了解为 datasophon 添加组件需要做哪些事情。后面详细说明
解决
下载 ClickHouse
我的 ubuntu 版本为:
Welcome to Ubuntu 24.04 .2 LTS(GNU/Linux 6.8 .0 -90 -generic x86_64)
所以选择 23.3.10.5 版本,我下载了好多:clickhouse-server,clickhouse-keeper-dbg,clickhouse-keeper,clickhouse-common-static-dbg,clickhouse-common-static。实际到后面只用到:clickhouse-common-static-23.3.10.5。
下载地址:https://packages.clickhouse.com/tgz/stable/clickhouse-common-static-23.3.10.5-amd64.tgz
在 DataSophon 配置 ClickHouse 及定制策略代码
下载完 clickhouse 之后,下面开始搞配置了。要做三件事:
1.构造 tar.gz 包
在临时目录创建 cickhouse-23.3.10.5,目前结构如下:
bin 目录:就放 clickhouse 的二进制文件:clickhouse。这个文件来自 common-static-23.3.10.5-amd64.tgz,把这个文件解压,然后复制到 bin 目录即可。
clickhouse 脚本文件: clickhouse.sh 负责启动,停止,查询 clickhouse 进程状态的脚本,包装后供 datasophon 调用,内容如下:
#!/bin/bash
set -e
CLICKHOUSE_HOME="/opt/datasophon/clickhouse"
CLICKHOUSE_BIN="$CLICKHOUSE_HOME /bin/clickhouse"
CONFIG_DIR="$CLICKHOUSE_HOME /conf"
LOG_DIR=
DATA_DIR=
TMP_DIR=
PID_DIR=
PID_FILE=
RED=
GREEN=
YELLOW=
BLUE=
NC=
(){ -e ;}
(){ -e ;}
(){ pgrep -f | -1}
(){
pid=$(get_pid)
[ -n ];
0
1
}
(){
-p
755
}
(){
log_info
pid=$(get_pid)
[ -n ];
log_info
0
create_dirs
[ ! -f ];
log_error
1
[ ! -f ];
log_error
1
http_port=$(grep -oP 2>/dev/null || )
log_info
server \
--config-file= \
> 2>&1 &
start_pid=$!
3
new_pid=$(get_pid)
[ -n ];
>
log_info
waited=0
[ -lt 20 ];
curl -s 2>/dev/null | grep -q ;
log_info
1
waited=$((waited + ))
log_error
-20
1
}
(){
log_info
pid=$(get_pid)
[ -z ];
log_info
-f
0
log_info
-TERM 2>/dev/null ||
waited=0
[ -lt 30 ];
! -0 2>/dev/null;
1
waited=$((waited + ))
-0 2>/dev/null;
log_warn
-KILL 2>/dev/null ||
-f
log_info
}
(){
pid=$(get_pid)
[ -f ];
file_pid=$( )
[ = ] && [ -n ];
0
[ -n ];
0
-f
1
[ -n ];
0
1
}
(){
stop
2
start
}
start) start ;;
stop) stop ;;
restart) restart ;;
status) status ;;
*)
1
;;
$?
"$CLICKHOUSE_HOME /logs"
"$CLICKHOUSE_HOME /data"
"$CLICKHOUSE_HOME /tmp"
"$CLICKHOUSE_HOME /run"
"$PID_DIR /clickhouse-server.pid"
'\033[0;31m'
'\033[0;32m'
'\033[1;33m'
'\033[0;34m'
'\033[0m'
log_info
echo
"${GREEN} [INFO]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1 "
log_error
echo
"${RED} [ERROR]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1 "
get_pid
"clickhouse.*server.*config.xml"
head
is_running
local
if
"$pid "
then
echo
"$pid "
return
fi
return
create_dirs
mkdir
"$PID_DIR "
"$LOG_DIR "
"$DATA_DIR "
chmod
"$PID_DIR "
"$LOG_DIR "
"$DATA_DIR "
start
"启动 ClickHouse 服务器..."
local
if
"$pid "
then
"ClickHouse 已在运行 (PID: $pid )"
return
fi
if
"$CLICKHOUSE_BIN "
then
"二进制文件不存在:$CLICKHOUSE_BIN "
return
fi
if
"$CONFIG_DIR /config.xml"
then
"配置文件不存在:$CONFIG_DIR /config.xml"
return
fi
local
'<http_port>\K[^<]+'
"$CONFIG_DIR /config.xml"
echo
"8123"
"执行:$CLICKHOUSE_BIN server --config-file=$CONFIG_DIR /config.xml"
nohup
"$CLICKHOUSE_BIN "
"$CONFIG_DIR /config.xml"
"$LOG_DIR /clickhouse-server.log"
local
sleep
local
if
"$new_pid "
then
echo
"$new_pid "
"$PID_FILE "
"✅ ClickHouse 启动成功 (PID: $new_pid )"
local
while
$waited
do
if
"http://localhost:$http_port /ping"
"Ok"
then
"✅ HTTP 接口正常 (端口:$http_port )"
break
fi
sleep
1
done
else
"❌ ClickHouse 启动失败"
tail
"$LOG_DIR /clickhouse-server.log"
return
fi
stop
"停止 ClickHouse 服务器..."
local
if
"$pid "
then
"ClickHouse 未运行"
rm
"$PID_FILE "
return
fi
"停止进程 (PID: $pid )..."
kill
"$pid "
true
local
while
$waited
do
if
kill
"$pid "
then
break
fi
sleep
1
done
if
kill
"$pid "
then
"强制终止进程..."
kill
"$pid "
true
fi
rm
"$PID_FILE "
"✅ ClickHouse 已停止"
status
local
if
"$PID_FILE "
then
local
cat
"$PID_FILE "
if
"$pid "
"$file_pid "
"$pid "
then
echo
"ClickHouse is running (PID: $pid )"
return
elif
"$pid "
then
echo
"ClickHouse is running (PID: $pid ) but PID file mismatch"
echo
"Consider: echo $pid > $PID_FILE "
return
else
echo
"ClickHouse is not running (stale PID file)"
rm
"$PID_FILE "
return
fi
elif
"$pid "
then
echo
"ClickHouse is running (PID: $pid ) but PID file is missing"
echo
"Consider: echo $pid > $PID_FILE "
return
else
echo
"ClickHouse is not running"
return
fi
restart
sleep
case
"${1:-help} "
in
echo
"Usage: $0 {start|stop|restart|status}"
echo
" start - Start ClickHouse server"
echo
" stop - Stop ClickHouse server"
echo
" restart - Restart ClickHouse server"
echo
" status - Check ClickHouse status"
exit
esac
exit
配置 templates: 这个存放生成 xml 的 freemark 模板,用来生成 xml。涉及文件如下图(略)。
<?xml version="1.0" ?>
<yandex >
<users_config > conf/users.xml</users_config >
<listen_host > 0.0.0.0</listen_host >
<http_port > ${http_port}</http_port >
<tcp_port > ${tcp_port}</tcp_port >
<interserver_http_port > ${interserver_http_port}</interserver_http_port >
<max_connections > ${max_connections}</max_connections >
<keep_alive_timeout > 3</keep_alive_timeout >
<max_concurrent_queries > 100</max_concurrent_queries >
<zookeeper >
<#if zk_address??&& zk_address?has_content>
<#list zk_address?split(",") as zkNode>
<#if zkNode?contains(":")>
<#assign parts = zkNode?split(":")>
<node >
<host > ${parts[0]}</host >
<port > ${parts[1]}</port >
</node >
<#else>
<node >
<host > ${zkNode}</host >
<port > 2181</port >
</node >
</#if>
</#list>
<#else>
<node >
<host > localhost</host >
<port > 2181</port >
</node >
</#if>
<session_timeout_ms > 30000</session_timeout_ms >
<operation_timeout_ms > 10000</operation_timeout_ms >
<root > ${zk_root!'/clickhouse'}</root >
</zookeeper >
<remote_servers >
<${cluster_name}>
<#if cluster_hosts??&& cluster_hosts?has_content>
<#-- 创建分片映射表 -->
<#assign shardMap={}>
<#-- 解析每个节点定义 -->
<#list cluster_hosts?split(",") as hostDef>
<#assign hostDef = hostDef?trim>
<#if hostDef?contains(":")>
<#assign parts = hostDef?split(":")>
<#if parts?size == 3>
<#assign shardNum = parts[0]?trim>
<#assign replicaNum = parts[1]?trim>
<#assign hostName = parts[2]?trim>
<#-- 初始化分片数组 -->
<#if !shardMap[shardNum]??>
<#assign shardMap = shardMap +{shardNum:[]}></#if>
<#-- 添加副本到分片 -->
<#assign replicaInfo={"host": hostName,"replica": replicaNum}>
<#assign currentList = shardMap[shardNum]>
<#assign shardMap = shardMap +{shardNum: currentList +[replicaInfo]}>
</#if>
</#if>
</#list>
<#-- 按分片编号排序后生成配置 -->
<#list shardMap?keys?sort as shardKey>
<shard >
<internal_replication > true</internal_replication >
<#list shardMap[shardKey] as replicaInfo>
<replica >
<host > ${replicaInfo.host}</host >
<port > ${tcp_port}</port >
</replica >
</#list>
</shard >
</#list>
<#else>
<shard >
<internal_replication > true</internal_replication >
<replica >
<host > ${hostname}</host >
<port > ${tcp_port}</port >
</replica >
</shard >
</#if>
</${cluster_name}>
</remote_servers >
<macros >
<cluster > ${cluster_name}</cluster >
<shard > ${current_shard}</shard >
<replica > ${current_replica}</replica >
<hostname > ${hostname}</hostname >
</macros >
<path > ${data_path}</path >
<tmp_path > ${tmp_path}</tmp_path >
<user_files_path > ${user_files_path}</user_files_path >
<format_schema_path > /opt/datasophon/clickhouse/format_schemas/</format_schema_path >
<logger >
<level > ${log_level}</level >
<log > ${log_dir}/clickhouse-server.log</log >
<errorlog > ${log_dir}/clickhouse-server.err.log</errorlog >
<size > 1000M</size >
<count > 10</count >
</logger >
<query_log >
<database > system</database >
<table > query_log</table >
<partition_by > toYYYYMM(event_date)</partition_by >
<flush_interval_milliseconds > 7500</flush_interval_milliseconds >
</query_log >
<trace_log >
<database > system</database >
<table > trace_log</table >
<partition_by > toYYYYMM(event_date)</partition_by >
<flush_interval_milliseconds > 7500</flush_interval_milliseconds >
</trace_log >
<compression >
<case >
<min_part_size > 10000000000</min_part_size >
<min_part_size_ratio > 0.01</min_part_size_ratio >
<method > lz4</method >
</case >
</compression >
<merge_tree >
<max_suspicious_broken_parts > 5</max_suspicious_broken_parts >
<max_part_loading_threads > 16</max_part_loading_threads >
</merge_tree >
<distributed_ddl >
<path > /clickhouse/task_queue/ddl</path >
</distributed_ddl >
</yandex >
<?xml version="1.0" ?>
<yandex >
<profiles >
<default >
<max_memory_usage > ${max_memory_usage}</max_memory_usage >
<max_memory_usage_for_user > ${max_memory_usage}</max_memory_usage_for_user >
<max_partitions_per_insert_block > ${max_partitions_per_insert_block}</max_partitions_per_insert_block >
<use_uncompressed_cache > 0</use_uncompressed_cache >
<load_balancing > random</load_balancing >
<log_queries > 1</log_queries >
</default >
<readonly >
<readonly > 1</readonly >
<max_memory_usage > 5000000000</max_memory_usage >
<max_partitions_per_insert_block > 100</max_partitions_per_insert_block >
</readonly >
</profiles >
<users >
<default >
<#if default_password??&& default_password?has_content>
<password > ${default_password}</password >
<#else>
<password > </password >
</#if>
<networks >
<ip > ::/0</ip >
</networks >
<profile > default</profile >
<quota > default</quota >
</default >
<datasophon >
<password > ${datasophon_password!'datasophon123'}</password >
<networks >
<ip > ::/0</ip >
</networks >
<profile > readonly</profile >
<quota > default</quota >
</datasophon >
</users >
<quotas >
<default >
<interval >
<duration > 3600</duration >
<queries > 0</queries >
<errors > 0</errors >
<result_rows > 0</result_rows >
<read_rows > 0</read_rows >
<execution_time > 0</execution_time >
</interval >
</default >
</quotas >
</yandex >
<?xml version="1.0" ?>
<yandex >
<macros >
<cluster > ${cluster_name!'datasophon_cluster'}</cluster >
<shard > ${current_shard!'1'}</shard >
<replica > ${current_replica!'1'}</replica >
<host > ${hostname!'localhost'}</host >
</macros >
</yandex >
cd /tmp/DDP
tar -zcvf clickhouse-23.3.10.5.tar.gz clickhouse-23.3.10.5/
记住,先走到 clickhouse-23.3.10.5 上层目录,然后再执行打包,否则打包后,里面目录结构不对。
复制到/opt/datasophon/DDP/package 目录,并重新生成 md5 签名文件
cp /tmp/DDP/clickhouse-23.3.10.5.tar.gz /opt/datasophon/DDP/package
cd /opt/datasophon/DDP/package
md5sum clickhouse-23.3.10.5.tar.gz | awk '{print $1}' > clickhouse-23.3.10.5.tar.gz.md5
2.配置组件配置文件 这个文件比较重要,datasophon-manager 启动时,会把它加载到组件列表里头。为了有版本管理,我是先在 datasophon-api 模块里添加,然后复制到 /opt/datasophon-manager-1.2.1/conf/meta/DDP-1.2.1/CLICKHOUSE 目录。
{
"name" : "CLICKHOUSE" ,
"label" : "ClickHouse" ,
"description" : "ClickHouse OLAP Database System - 高性能列式数据库,支持实时分析" ,
"version" : "23.3.10.5" ,
"sortNum" : 20 ,
"dependencies" : [ "ZOOKEEPER" ] ,
"packageName" : "clickhouse-23.3.10.5.tar.gz" ,
"decompressPackageName" : "clickhouse-23.3.10.5" ,
"roles" : [
{
"name" : "ckworker" ,
"label" : "ClickHouse Worker" ,
"description" : "数据存储和计算节点" ,
"roleType" : "worker" ,
"cardinality" : "2+" ,
"logFile" : "logs/clickhouse-server.log" ,
"jmxPort" : 0 ,
"startRunner" : { "timeout" : "120" , "program" : "clickhouse.sh" , "args" : [ "start" ] } ,
"stopRunner" : { "timeout" : "60" , "program" : "clickhouse.sh" , "args" : [ "stop" ] } ,
"statusRunner" : { "timeout" : "30" , "program" : "clickhouse.sh" , "args" : [ "status" ] } ,
"restartRunner" : { "timeout" : "120" , "program" : "clickhouse.sh" , "args" : [ "restart" ] }
} ,
{
"name" : "ckclient" ,
"label" : "ClickHouse Client" ,
"description" : "查询接入节点" ,
"roleType" : "client" ,
"cardinality" : "0+" ,
"logFile" : "logs/clickhouse-server.log" ,
"jmxPort" : 0 ,
"startRunner" : { "timeout" : "120" , "program" : "clickhouse.sh" , "args" : [ "start" ] } ,
"stopRunner" : { "timeout" : "60" , "program" : "clickhouse.sh" , "args" : [ "stop" ] } ,
"statusRunner" : { "timeout" : "30" , "program" : "clickhouse.sh" , "args" : [ "status" ] } ,
"restartRunner" : { "timeout" : "120" , "program" : "clickhouse.sh" , "args" : [ "restart" ] } ,
"externalLink" : { "name" : "ClickHouse UI" , "label" : "ClickHouse UI" , "url" : "http://${host}:${http_port}" }
}
] ,
"configWriter" : {
"generators" : [
{
"filename" : "config.xml" ,
"configFormat" : "custom" ,
"outputDirectory" : "conf" ,
"templateName" : "config.xml.ftl" ,
"includeParams" : [ "http_port" , "tcp_port" , "interserver_http_port" , "zk_address" , "zk_root" , "cluster_name" , "cluster_hosts" , "data_path" , "tmp_path" , "user_files_path" , "log_dir" , "log_level" , "max_connections" , "max_memory_usage" , "current_shard" , "current_replica" , "hostname" ]
} ,
{
"filename" : "users.xml" ,
"configFormat" : "custom" ,
"outputDirectory" : "conf" ,
"templateName" : "users.xml.ftl" ,
"includeParams" : [ "default_password" , "datasophon_password" , "max_memory_usage_for_user" , "max_memory_usage" , "max_partitions_per_insert_block" ]
} ,
{
"filename" : "macros.xml" ,
"configFormat" : "custom" ,
"outputDirectory" : "conf" ,
"templateName" : "macros.xml.ftl" ,
"includeParams" : [ "cluster_name" , "current_shard" , "current_replica" , "hostname" ]
}
]
} ,
"parameters" : [
{
"name" : "http_port" ,
"label" : "HTTP 端口" ,
"description" : "ClickHouse HTTP 接口端口" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "8123"
} ,
{
"name" : "tcp_port" ,
"label" : "TCP 端口" ,
"description" : "ClickHouse 原生 TCP 端口" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "9000"
} ,
{
"name" : "interserver_http_port" ,
"label" : "内部通信端口" ,
"description" : "ClickHouse 节点间通信端口" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "9009"
} ,
{
"name" : "zk_address" ,
"label" : "ZK 地址" ,
"description" : "ZooKeeper 集群地址,格式:host1:port1,host2:port2" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "${zkUrls}"
} ,
{
"name" : "zk_root" ,
"label" : "ZK 根路径" ,
"description" : "在 ZooKeeper 中的根路径" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "/clickhouse"
} ,
{
"name" : "cluster_name" ,
"label" : "集群名称" ,
"description" : "ClickHouse 分布式集群名称" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "datasophon_cluster"
} ,
{
"name" : "cluster_hosts" ,
"label" : "集群节点列表" ,
"description" : "集群节点配置,格式:分片编号:副本编号:主机名,多个节点用逗号分隔\n示例:1:1:ddp1,1:2:ddp2,2:1:ddp3,2:2:ddp4" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "1:1:ddp1,1:2:ddp2,2:1:ddp3,2:2:ddp4"
} ,
{
"name" : "current_shard" ,
"label" : "当前节点分片" ,
"description" : "该节点所属的分片编号" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "1"
} ,
{
"name" : "current_replica" ,
"label" : "当前节点副本" ,
"description" : "该节点在分片中的副本编号" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "1"
} ,
{
"name" : "hostname" ,
"label" : "主机名" ,
"description" : "当前节点主机名" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : false ,
"hidden" : true ,
"defaultValue" : "${host}"
} ,
{
"name" : "data_path" ,
"label" : "数据目录" ,
"description" : "ClickHouse 数据存储路径" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "/opt/datasophon/clickhouse/data"
} ,
{
"name" : "tmp_path" ,
"label" : "临时目录" ,
"description" : "ClickHouse 临时文件路径" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "/opt/datasophon/clickhouse/tmp"
} ,
{
"name" : "user_files_path" ,
"label" : "用户文件目录" ,
"description" : "ClickHouse 用户文件路径" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "/opt/datasophon/clickhouse/user_files"
} ,
{
"name" : "log_dir" ,
"label" : "日志目录" ,
"description" : "ClickHouse 日志文件路径" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "/var/log/clickhouse-server"
} ,
{
"name" : "log_level" ,
"label" : "日志级别" ,
"description" : "ClickHouse 日志级别" ,
"configType" : "map" ,
"required" : true ,
"type" : "select" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "information" ,
"values" : [
{ "value" : "trace" , "desc" : "TRACE" } ,
{ "value" : "debug" , "desc" : "DEBUG" } ,
{ "value" : "information" , "desc" : "INFORMATION" } ,
{ "value" : "warning" , "desc" : "WARNING" } ,
{ "value" : "error" , "desc" : "ERROR" }
]
} ,
{
"name" : "default_password" ,
"label" : "默认用户密码" ,
"description" : "ClickHouse 默认用户密码" ,
"configType" : "map" ,
"required" : false ,
"type" : "password" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : true ,
"defaultValue" : ""
} ,
{
"name" : "datasophon_password" ,
"label" : "DataSophon 用户密码" ,
"description" : "DataSophon 管理用户密码" ,
"configType" : "map" ,
"required" : true ,
"type" : "password" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : true ,
"defaultValue" : "datasophon123"
} ,
{
"name" : "max_connections" ,
"label" : "最大连接数" ,
"description" : "最大客户端连接数" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "4096"
} ,
{
"name" : "max_memory_usage" ,
"label" : "最大内存使用" ,
"description" : "单次查询最大内存使用 (字节)" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "10000000000"
} ,
{
"name" : "max_memory_usage_for_user" ,
"label" : "用户最大内存使用" ,
"description" : "单用户单次查询最大内存使用 (字节)" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "10000000000"
} ,
{
"name" : "max_partitions_per_insert_block" ,
"label" : "单次插入最大分区数" ,
"description" : "单次 INSERT 操作允许的最大分区数量" ,
"configType" : "map" ,
"required" : true ,
"type" : "input" ,
"value" : "" ,
"configurableInWizard" : true ,
"hidden" : false ,
"defaultValue" : "100"
}
]
}
这个文件告诉 datasophon,你的组件叫啥名称,版本号多少,依赖组件是啥,对应 tar 包,有什么角色,每种角色如何启动,停止,重启,查询状态。涉及哪些配置文件,每个配置文件如何生成,使用什么模板,需要什么参数。每个参数是输入还是什么的,显示还是隐藏等等。
3.定制策略处理代码 刚开始我没想弄个策略文件,后来启动时,提示需要 zk 目录,这个目录必须事先创建,但是 zk 目录只能创建一次,后面再创建会报错,如果安装集群的话,势必会引起重复创建文件。所以引入策略文件,使用 redis 锁来保证只能创建一次。
ServiceRoleStrategyContext:
package com.datasophon.worker.strategy;
import org.apache.commons.lang.StringUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ServiceRoleStrategyContext {
private static final Map<String, ServiceRoleStrategy> map = new ConcurrentHashMap <>();
static {
map.put("NameNode" , new NameNodeHandlerStrategy ("HDFS" , "NameNode" ));
map.put("ZKFC" , new ZKFCHandlerStrategy ("HDFS" , "ZKFC" ));
map.put("JournalNode" , new JournalNodeHandlerStrategy ("HDFS" , "JournalNode" ));
map.put("DataNode" , new DataNodeHandlerStrategy ("HDFS" , "DataNode" ));
map.put("ResourceManager" , new ResourceManagerHandlerStrategy ("YARN" , "ResourceManager" ));
map.put("NodeManager" , new NodeManagerHandlerStrategy ("YARN" , "NodeManager" ));
map.put("RangerAdmin" , new RangerAdminHandlerStrategy ("RANGER" , "RangerAdmin" ));
map.put("HiveServer2" , new HiveServer2HandlerStrategy ("HIVE" , "HiveServer2" ));
map.put("HbaseMaster" , new HbaseHandlerStrategy ("HBASE" , "HbaseMaster" ));
map.put("RegionServer" , new HbaseHandlerStrategy ("HBASE" , "RegionServer" ));
map.put("Krb5Kdc" , new Krb5KdcHandlerStrategy ("KERBEROS" , "Krb5Kdc" ));
map.put("KAdmin" , new KAdminHandlerStrategy ("KERBEROS" , "KAdmin" ));
map.put("SRFE" , new FEHandlerStrategy ("STARROCKS" , "SRFE" ));
map.put("DorisFE" , new FEHandlerStrategy ("DORIS" , "DorisFE" ));
map.put("DorisFEObserver" , new FEObserverHandlerStrategy ("DORIS" , "DorisFEObserver" ));
map.put("ZkServer" , new ZkServerHandlerStrategy ("ZOOKEEPER" , "ZkServer" ));
map.put("KafkaBroker" , new KafkaHandlerStrategy ("KAFKA" , "KafkaBroker" ));
map.put("SRBE" , new BEHandlerStrategy ("STARROCKS" , "SRBE" ));
map.put("DorisBE" , new BEHandlerStrategy ("DORIS" , "DorisBE" ));
map.put("HistoryServer" , new HistoryServerHandlerStrategy ("YARN" , "HistoryServer" ));
map.put("TezServer" , new TezServerHandlerStrategy ("TEZ" , "TezServer" ));
map.put("KyuubiServer" , new KyuubiServerHandlerStrategy ("KYUUBI" , "KyuubiServer" ));
map.put("ckworker" , new ClickHouseWorkerHandlerStrategy ("CLICKHOUSE" , "ckworker" ));
}
public static ServiceRoleStrategy getServiceRoleHandler (String type) {
if (StringUtils.isBlank(type)) {
return null ;
}
return map.get(type);
}
}
这个文件其实就加了一行,告诉 datasophon,clickhouse ckworker 使用策略类名
map.put("ckworker" , new ClickHouseWorkerHandlerStrategy ("CLICKHOUSE" , "ckworker" ));
package com.datasophon.worker.service;
import com.datasophon.common.Constants;
import com.datasophon.common.redis.utils.RedisLockUtil;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.common.utils.ShellUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
public class ZookeeperPathService {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperPathService.class);
private static final String ZK_PATH_LOCK_KEY = "datasophon:clickhouse:zkpath:init:lock" ;
private static final long LOCK_TIMEOUT = 300 ;
private static final long LOCK_WAIT_TIME = 30000 ;
private static final String ZK_CLI_PATH = "/opt/datasophon/zookeeper/bin/zkCli.sh" ;
public static boolean createPathWithLock (String zkServer, String zkPath, boolean createSubPaths) {
logger.info("🚀 =========================================" );
logger.info("🚀 ZooKeeper 目录创建开始" );
logger.info("🚀 ZK 服务器:{}" , zkServer);
logger.info("🚀 ZK 路径:{}" , zkPath);
logger.info("🚀 创建子目录:{}" , createSubPaths);
logger.info("🚀 Redis 锁键:{}" , ZK_PATH_LOCK_KEY);
logger.info("🚀 =========================================" );
try {
logger.info("1️⃣ 测试 Redis 连接..." );
boolean redisConnected = RedisLockUtil.testRedisConnection();
if (!redisConnected) {
logger.warn("⚠️ Redis 连接测试失败,尝试直接创建 ZK 目录" );
return createPathDirect(zkServer, zkPath, createSubPaths);
}
logger.info("2️⃣ 检查 ZK 目录是否已存在..." );
if (isPathExists(zkServer, zkPath)) {
logger.info("✅ ZK 目录已存在:{}, 跳过创建" , zkPath);
return true ;
}
logger.info("3️⃣ 使用 Redis 分布式锁执行 ZK 目录创建..." );
boolean result = RedisLockUtil.executeWithLock(ZK_PATH_LOCK_KEY, LOCK_TIMEOUT, LOCK_WAIT_TIME, () -> {
logger.info("🔒 在 Redis 锁保护区内,开始创建 ZK 目录" );
if (isPathExists(zkServer, zkPath)) {
logger.info("✅ 在等待锁期间,ZK 目录已被其他节点创建" );
return true ;
}
return createPathDirect(zkServer, zkPath, createSubPaths);
});
logger.info("🎉 ZK 目录创建结果:{}" , result);
return result;
} catch (Exception e) {
logger.error("❌ ZK 目录创建过程中发生异常" , e);
logger.error(" 异常堆栈:" , e);
logger.warn("🔄 尝试直接创建 ZK 目录(绕过 Redis 锁)..." );
try {
boolean directResult = createPathDirect(zkServer, zkPath, createSubPaths);
logger.info("直接创建结果:{}" , directResult);
return directResult;
} catch (Exception ex) {
logger.error("❌ 直接创建也失败了" , ex);
return false ;
}
} finally {
logger.info("🚀 =========================================" );
logger.info("🚀 ZooKeeper 目录创建结束" );
logger.info("🚀 =========================================" );
}
}
public static boolean createPathDirect (String zkServer, String zkPath, boolean createSubPaths) {
logger.info("🔧 开始直接创建 ZK 目录(无锁)" );
logger.info(" ZK 服务器:{}" , zkServer);
logger.info(" ZK 路径:{}" , zkPath);
logger.info(" 创建子目录:{}" , createSubPaths);
try {
if (!isZkCliAvailable()) {
logger.error("❌ zkCli.sh 不存在或不可执行:{}" , ZK_CLI_PATH);
return false ;
}
logger.info(" 创建主目录:{}" , zkPath);
boolean mainPathCreated = createSinglePath(zkServer, zkPath);
if (!mainPathCreated) {
logger.error("❌ 创建主目录失败:{}" , zkPath);
return false ;
}
logger.info("✅ 主目录创建成功" );
if (createSubPaths) {
logger.info(" 创建 ClickHouse 子目录..." );
String[] subPaths = { zkPath + "/task_queue" , zkPath + "/log" , zkPath + "/log/log-1" , zkPath + "/blocks" , zkPath + "/replicas" , zkPath + "/metadata" };
for (String subPath : subPaths) {
if (!isPathExists(zkServer, subPath)) {
boolean created = createSinglePath(zkServer, subPath);
logger.info(" 子目录 {}: {}" , subPath, created ? "✅" : "❌" );
} else {
logger.info(" 子目录 {}: ✅ (已存在)" , subPath);
}
}
}
logger.info("✅ ZK 目录创建完成" );
return true ;
} catch (Exception e) {
logger.error("🔥 创建 ZK 目录异常" , e);
return false ;
}
}
private static boolean createSinglePath (String zkServer, String zkPath) {
try {
ArrayList<String> commands = new ArrayList <>();
commands.add(ZK_CLI_PATH);
commands.add("-server" );
commands.add(zkServer);
commands.add("create" );
commands.add(zkPath);
commands.add("\"\"" );
logger.debug(" 执行命令:{} -server {} create {} \"\"" , ZK_CLI_PATH, zkServer, zkPath);
ExecResult execResult = ShellUtils.execWithStatus("/tmp" , commands, 30L );
if (execResult.getExecResult()) {
String output = execResult.getExecOut();
if (output != null && output.contains("Created" )) {
logger.debug(" ✅ 创建成功:{}" , zkPath);
return true ;
}
}
if (execResult.getExecErrOut() != null && execResult.getExecErrOut().contains("Node already exists" )) {
logger.debug(" ℹ️ 节点已存在:{}" , zkPath);
return true ;
}
logger.error(" ❌ 创建失败:{}, 错误:{}" , zkPath, execResult.getExecErrOut());
return false ;
} catch (Exception e) {
logger.error(" 创建异常:{}" , zkPath, e);
return false ;
}
}
public static boolean isPathExists (String zkServer, String zkPath) {
logger.debug("检查 ZK 路径是否存在:{}" , zkPath);
try {
ArrayList<String> commands = new ArrayList <>();
commands.add(ZK_CLI_PATH);
commands.add("-server" );
commands.add(zkServer);
commands.add("ls" );
commands.add(zkPath);
ExecResult execResult = ShellUtils.execWithStatus("/tmp" , commands, 30L );
if (execResult.getExecResult()) {
String output = execResult.getExecOut();
boolean exists = output != null && !output.trim().isEmpty() && !output.contains("[]" );
logger.debug(" ZK 路径 {}: {}" , zkPath, exists ? "✅ 存在" : "❌ 不存在" );
return exists;
}
if (execResult.getExecErrOut() != null && execResult.getExecErrOut().contains("No node" )) {
logger.debug(" ZK 路径 {}: ❌ 不存在 (No node)" , zkPath);
return false ;
}
logger.warn(" 检查 ZK 路径失败:{}" , execResult.getExecErrOut());
return false ;
} catch (Exception e) {
logger.warn(" 检查 ZK 路径异常:{}" , zkPath, e);
return false ;
}
}
private static boolean isZkCliAvailable () {
try {
ArrayList<String> commands = new ArrayList <>();
commands.add("test" );
commands.add("-f" );
commands.add(ZK_CLI_PATH);
ExecResult execResult = ShellUtils.execWithStatus("/tmp" , commands, 10L );
return execResult.getExecResult();
} catch (Exception e) {
logger.error("检查 zkCli.sh 失败" , e);
return false ;
}
}
public static String getZkServerFromConfig (String installPath, String packageName) {
String configFile = installPath + Constants.SLASH + packageName + "/conf/config.xml" ;
try {
String content = cn.hutool.core.io.FileUtil.readUtf8String(configFile);
String host = cn.hutool.core.util.ReUtil.get("<host>([^<]+)</host>" , content, 1 );
String port = cn.hutool.core.util.ReUtil.get("<port>([^<]+)</port>" , content, 1 );
if (host != null && port != null ) {
return host + ":" + port;
}
} catch (Exception e) {
logger.warn("解析 ZK 地址失败" , e);
}
return "192.168.1.181:2181" ;
}
public static String getZkRootFromConfig (String installPath, String packageName) {
String configFile = installPath + Constants.SLASH + packageName + "/conf/config.xml" ;
try {
String content = cn.hutool.core.io.FileUtil.readUtf8String(configFile);
String root = cn.hutool.core.util.ReUtil.get("<root>([^<]+)</root>" , content, 1 );
if (root != null ) {
return root;
}
} catch (Exception e) {
logger.warn("解析 ZK 根路径失败" , e);
}
return "/clickhouse" ;
}
}
这个文件完成帮 clickhouse 使用 redis 加锁方式创建 zk 目录,创建前判断是否存在。
ClickHouseWorkerHandlerStrategy:
package com.datasophon.worker.strategy;
import com.datasophon.common.Constants;
import com.datasophon.common.command.ServiceRoleOperateCommand;
import com.datasophon.common.enums.CommandType;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.worker.handler.ServiceHandler;
import com.datasophon.worker.service.ZookeeperPathService;
import java.sql.SQLException;
public class ClickHouseWorkerHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy {
public ClickHouseWorkerHandlerStrategy (String serviceName, String serviceRoleName) {
super (serviceName, serviceRoleName);
}
@Override
public ExecResult handler (ServiceRoleOperateCommand command) throws SQLException, ClassNotFoundException {
ExecResult startResult = new ExecResult ();
ServiceHandler serviceHandler = new ServiceHandler (command.getServiceName(), command.getServiceRoleName());
logger.info("ClickHouse 服务角色处理开始,命令类型:{}" , command.getCommandType());
logger.info("是否为从节点:{}" , command.isSlave());
if (command.getCommandType().equals(CommandType.INSTALL_SERVICE) && !command.isSlave()) {
logger.info("🚀 =========================================" );
logger.info("🚀 ClickHouse ZooKeeper 目录初始化" );
logger.info("🚀 =========================================" );
String installPath = Constants.INSTALL_PATH;
String packageName = command.getDecompressPackageName();
String zkServer = ZookeeperPathService.getZkServerFromConfig(installPath, packageName);
String zkRoot = ZookeeperPathService.getZkRootFromConfig(installPath, packageName);
logger.info("ZooKeeper 服务器:{}" , zkServer);
logger.info("ZooKeeper 根路径:{}" , zkRoot);
boolean createSuccess = ZookeeperPathService.createPathWithLock(
zkServer, zkRoot, true
);
if (createSuccess) {
logger.info("✅ ClickHouse ZooKeeper 目录初始化成功" );
} else {
logger.warn("⚠️ ClickHouse ZooKeeper 目录初始化失败或已被其他节点初始化" );
}
logger.info("🚀 =========================================\n" );
}
logger.info("启动 ClickHouse 服务..." );
startResult = serviceHandler.start(
command.getStartRunner(),
command.getStatusRunner(),
command.getDecompressPackageName(),
command.getRunAs()
);
if (startResult.getExecResult()) {
logger.info("✅ ClickHouse 服务启动成功" );
} else {
logger.error("❌ ClickHouse 服务启动失败" );
}
return startResult;
}
}
这个是策略文件,参考其他策略文件改写而成,就加了调用上面的创建 zk 目录服务即可。
集群配置 这个安装默认是集群安装的,即有分片,副本。但是如何验证是否真正分片及副本。
那只能使用 ck 命令回答这个问题。
root@ddp2:/opt/datasophon/clickhouse# ./clickhouse.sh start
[INFO] 2026-03-12 01:47:49 启动 ClickHouse 服务器...
[INFO] 2026-03-12 01:47:49 执行:/opt/datasophon/clickhouse/bin/clickhouse server --config-file=/opt/datasophon/clickhouse/conf/config.xml
[INFO] 2026-03-12 01:47:52 ✅ ClickHouse 启动成功 (PID: 507326)
[INFO] 2026-03-12 01:47:52 ✅ HTTP 接口正常 (端口:8123)
root@ddp2:/opt/datasophon/clickhouse# ./clickhouse.sh status
ClickHouse is running (PID: 507326)
root@ddp2:/opt/datasophon/clickhouse# cd bin
root@ddp2:/opt/datasophon/clickhouse/bin# ./clickhouse client --host 127.0.0.1 --port 9000 --user default
ClickHouse client version 23.3.10.5 (official build). Connecting to 127.0.0.1:9000 as user default.
Connected to ClickHouse server version 23.3.10 revision 54462.ddp2:) SHOW CLUSTERS;
SHOW CLUSTERS Query id : 0cb5946f-8041-4aff-96e0-6d51fe8c94d6
┌─cluster────────────┐
│ datasophon_cluster │
└────────────────────┘
1 row in set . Elapsed: 0.002 sec.
ddp2:) SELECT cluster, shard_num, replica_num, host_name FROM system.clusters WHERE cluster ='datasophon_cluster' ORDER BY shard_num, replica_num;
SELECT cluster, shard_num, replica_num, host_name FROM system.clusters WHERE cluster ='datasophon_cluster' ORDER BY shard_num ASC, replica_num ASC Query id : b22d1ae9-6ee0-4977-8b39-1e1faa8ae885
┌─cluster────────────┬─shard_num─┬─replica_num─┬─host_name─┐
│ datasophon_cluster │ 1 │ 1 │ ddp1 │
│ datasophon_cluster │ 1 │ 2 │ ddp2 │
│ datasophon_cluster │ 2 │ 1 │ ddp3 │
│ datasophon_cluster │ 2 │ 2 │ ddp4 │
└────────────────────┴───────────┴─────────────┴───────────┘
4 rows in set . Elapsed: 0.002 sec.
最后的查询,显示了整个集群配置。
查询 SELECT cluster, shard_num, replica_num, host_name FROM system.clusters 返回了完整的集群拓扑结构:
text
┌─cluster────────────┬─shard_num─┬─replica_num─┬─host_name─┐
│ datasophon_cluster │ 1 │ 1 │ ddp1 │ ← 分片 1 的副本 1
│ datasophon_cluster │ 1 │ 2 │ ddp2 │ ← 分片 1 的副本 2
│ datasophon_cluster │ 2 │ 1 │ ddp3 │ ← 分片 2 的副本 1
│ datasophon_cluster │ 2 │ 2 │ ddp4 │ ← 分片 2 的副本 2
└────────────────────┴───────────┴─────────────┴───────────┘
对应你最初的配置:
1:1:ddp1 → 分片 1,副本 1,主机 ddp1
1:2:ddp2 → 分片 1,副本 2,主机 ddp2
2:1:ddp3 → 分片 2,副本 1,主机 ddp3
2:2:ddp4 → 分片 2,副本 2,主机 ddp4
这正好对应你在 DataSophon 中输入的配置。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online