一 副本与分片概述
副本(replica) 是指两个相同数据的表或表一部分,作用是为了数据备份与安全
分片(shard) 是指不同的服务器存储同一张表的不同部分,作用是为了水平切分表,缓解单一服务的压力.
针对于副本的需求,有两种不同的方式,后面会一一概述.
二 下载并安装zookeeper
clickhouse要实现副本与分片需要依赖于zookeeper,并且zookeeper版本要3.4.5以及以上.
ZK的下载与安装参考我另外一篇博文Zookeeper的下载与安装
三 ClickHouse配置zookeeper
安装启动好zookeeper后,我们需要在clickhouse中配置zookeeper
clickhouse两种配置zookeeper方式,一种是直接在config.xml中配置,另外一种是在外部文件中配置好了,在config.xml中进行引用.
1 内部配置方式:vim /etc/clickhouse-server/config.xml
添加如下配置:
1 | <zookeeper> |
2 外部配置方式:
创建外部配置文件:
1 | vim /etc/clickhouse-server/config.d/zks.xml |
1 |
|
引入外部配置文件中的配置
1 | vim /etc/clickhouse-server/config.xml |
1 | #文件的路径 |
上方是麻烦的写法,如果在zks中的标签
zk的配置不支持热更改,必须要重启clickhouse服务,但是在重启之前可以先使用以下sql查询:
** **select * from system.zookeeper where path = '/';
会报表不存在.
重启服务后,再还执行上方sql,就可以查询到zookeeper表,说明zookeeper配置好了.
4 副本实现方式
1 复制合并树引擎实现副本
当前直接支持副本的引擎是Replicatedxxx复制合并树引擎,对于复制合并树引擎实现副本我已经写过了,所以这里只上链接
ClickHouse ReplicatedMergeTree家族引擎
2 通过分片配置实现副本
首先要配置cluster.
1 | vim /etc/clickhouse-server/config.xml |
在vim模式下输入:/remote_servers快速查找到配置cluster的标签

在<remote_servers>标签之中就是我们配置shard与replica的地方
先看下面配置,说明
1 | <my_shard> |
<my_shard>是cluster名称,任意填写,在后面通过这个名字引用如 on cluster my_shard
shard指分片,有几个分片就配置几个
replica指副本,有几个副本就有几个
上方的配置是有两个分片,每个分片两个副本.这里理解也可以是有两个分片,每个分片一个副本.
打个比方说,一张test表基于以上配置,共有id 1,2,3,4 四条数据,那么node01中有1,2两条,node02中有1,2两条,node03中有3,4两条,node04中有3,4两条,这样子举例比较好理解.,至于到底是两个分片两个副本,还是两个分片一个副本,随意,但是本文会说是两个副本.
因为这里我们是演示副本的实现,所以就不考虑分片了,分片后面说,所以现在只创建一个分片,分片中有三个副本.
每台服务器中都增加以下配置,host换成自己的ip
1 | <remote_servers incl="clickhouse_remote_servers" > |
上面配置好了以后,cluster可以热加载,所以不需要重启服务,通过系统表可以查看cluster
1 | select * from system.clusters where cluster='my_shard'; |

接下来的任务,需要借助于分布式表来完成,但是介绍分布式表之前,先说一下分布式ddl
分布式ddl指的是在一台服务器上执行sql,其他服务器同步执行,需要借助于cluster,如下方
create table tableName on cluster my_shard (id Int8,name String)engine=xxx ;
上面创建的一张表,除了on cluster my_shard以外就是正常的创建表语句,my_shard我们上面配置过,里面一共包含了node01,node02,node03 三个replica,那么在执行的时候,就会到my_shard配置下的所有服务器中执行create table语句.也就是这三台服务器任意一台执行以上sql,三台的表都会创建好,这就是分布式ddl,分布式ddl也需要zookeeper的支持,同样也支持drop table…
了解了分布式ddl,我们可以在任意一台机器上创建表:
create table default.replicaTest on cluster my_shard(id Int8,name String) engine =MergeTree order by id;

此时发现my_shard配置的node01,node02,node03上面都创建好了replicaTest表.表创建好了以后,接下来要创建分布式表,分布式表本身不存储任何数据,可以把它当成数据操作的工具,他会分发到被分布式表代理的其他表.
Distributed(cluster_name, db_name, table_name[, sharding_key[, policy_name]])
第一个是配置的cluster名称,第二个第三个分别代表代理的数据库,数据表,第四个参数是数据插入分发策略,指定一个字段名(必须是Int类型)或者rand(),第五个参数是策略名称.
下面基于上面的replicaTest创建一个分布式表:
1 | create table replicaTest_all as replicaTest |
我在node01机器上创建了一张分布式表(当然也可以通过分布式ddl,每台机器上都创建一张分布式表)
创建好了,我们插入数据:
1 | insert into replicaTest_all values(1,'zhang'); |
插入成功后,每台机器查询replicaTest表数据,发现三台机器上都有1,zhang这条数据.而查询replicaTest_all分布式表,也只查询到了1,zhang一条数据.
插入分布式表时,基于my_shard的配置,会把插入的数据全部分发到代理的每一台服务器的replicaTest表,这也就是为什么我们插入replicaTest_all结果三台机器都有这条数据的原因.而查询时,会从副本中选择一个查询(查询有策略可以选择,感兴趣可以查询load_balancing).针对于insert与select会作用于本地表,其他的操作基本都只会作用于分布式表.
通过分布式表的方式完成副本的实现,总结一下,1 配置文件配置cluster 2 cluster中配置的服务器上创建表 3 通过分布式表代理本地表,实现数据插入后分发到本地表.
这种数据副本的实现可以是任何引擎,但是对于某台Clickhouse的压力比较大(要负责向所有分片副本的发送),而第一种的副本实现必须是Repicated*合并树引擎,压力小于分布式表的方式(可以通过配置,只负责分片的发送).
通过分布式表的方式实现数据副本时,写入我们配置的分布式表时,分布式表会往shard下的所有replica都写入一份数据.如果我们这里的replicaTest表改成ReplicatedMergeTree引擎,那么我们就只需要分布式表选择一个replica写入即可,由ReplicatedMergeTree完成数据同步,而不需要每个replica都要写入,这项配置需要添加 <internal_replication>true<internal_replication>
1 | <my_shard> |
添加以上配置之后,每个shard只会写入一个<replica>,由写入的replica负责数据同步给其他<replica>
五 分片
上面我们只使用了一个分片,接下来说一下分片
1 | <remote_servers incl="clickhouse_remote_servers" > |
我们每台服务器添加以上配置,此时没有使用副本,只是三个分片.
查询分片:

配置好后,通过分布式ddl先创建表
1 | create table replicaTest2 on cluster my_shard_2 (id Int,name String) |
创建分布式表:
1 | create table replicaTest2_all as replicaTest2 |
插入数据
1 | insert into replicaTest2_all values(1,'zhang'),(2,'li'),(3,'zhao'), |
查看数据:
node01:
1 | node01.hadoop.com :) select * from replicaTest2; |
node02:
1 | node02.hadoop.com :) select * from replicaTest2; |
node03:
1 | node03.hadoop.com :) select * from replicaTest2; |
由此可见数据被replicaTest2_all分布式表随机分发到了三个分片中.
分发有策略,需要做配置<weight>
1 | <shard> |
weight默认为1,既node01 node02 node03 都为1,那么总的权重为3,每个分片通过分布式表第四个参数sharding_key分配到数据的概率是一样.
如果我们权重分别设置为1,2,3 那么总权重是6,那么总区间就是[0,6),排在shard配置第一位的node01,权重占比为1/6,所以属于区间[0,1),排在shard配置第二位的node02,占比2/6,所以区间为[1,3),至于最后的node03就是[3,6).所以如果rand()产生的数字除以6取余落在哪个区间,数据就会分发到哪个shard,通过权重配置,可以实现数据按照想要的比重分配.
六 副本与分片
上面分别讲述了副本与分片的实现,其实最合适的方法就是利用分布式表实现分片数据的写入,而每一分片内通过Replicated*引擎实现副本数据的同步,下面实现这个:
三台服务器配置my_shard_3
1 |
|
此时有两个shard 第二shard有两个副本.(因为本人只有三台服务器,故这样分配)
利用分布式ddl创建ReplicatedMergeTree表
1 | CREATE TABLE default.replicaTest3 on cluster my_shard_3 |
上方的创建表以及宏变量在上方贴的ClickHouse ReplicatedMergeTree家族引擎链接中已经讲过了,这里不再赘述,下面是我的配置的宏变量:**
**node01:
1 | node01.hadoop.com :) select * from system.macros; |
node02:
1 |
|
node03
1 | node03.hadoop.com :) select * from system.macros; |
创建好本地表后,创建分布式表:
1 | CREATE TABLE default.replicaTest3_all |
插入数据:
1 | insert into replicaTest3_all values(1,'zhangfei'),(2,'guanyu'),(3,'liubie'),(4,'zhaoyun'), |
查看数据:
node01:
1 | ┌─id─┬─name────┐ |
node02和node03
1 | ┌─id─┬─name─────┐ |
至此,最适合生产的分片与副本机制就配置好了.