ClickHouse-分片集群

ClickHouse-分片集群

副本虽然能够提高数据的可用性,降低丢失风险,但是每台服务器实际上必须容纳全量数据,对数据的横向扩容没有解决。

要解决数据水平切分的问题,需要引入分片的概念。通过分片把一份完整的数据进行切分,不同的分片分布到不同的节点上,再通过 Distributed 表引擎把数据拼接起来一同使用。

Distributed 表引擎本身不存储数据,有点类似于 MyCat 之于 MySql,成为一种中间件,通过分布式逻辑表来写入、分发、路由来操作多台节点不同分片的分布式数据。

注意:ClickHouse 的集群是表级别的,实际企业中,大部分做了高可用,但是没有用分

片,避免降低查询性能以及操作集群的复杂性。


1.集群写入流程(3 分片 2 副本共 6 个节点)

imginternal_replication:内部副本同步

true:由分片自己同步

false:由distribute表同步,压力大


2.集群读取流程(3 分片 2 副本共 6 个节点)

img


3 分片 2 副本共 6 个节点集群配置(供参考)

配置的位置还是在之前的/etc/clickhouse-server/config.d/metrika.xml,内容如下

注:也可以不创建外部文件,直接在 config.xml 的<remote_servers>中指定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<yandex>
  <remote_servers>
      <fz_cluster> <!-- 集群名称-->
          <shard> <!--集群的第一个分片-->
              <internal_replication>true</internal_replication>
              <!--该分片的第一个副本-->
              <replica>
                  <host>node01</host>
                  <port>9000</port>
              </replica>
              <!--该分片的第二个副本-->
              <replica>
                  <host>node02</host>
                  <port>9000</port>
              </replica>
          </shard>
          <shard> <!--集群的第二个分片-->
              <internal_replication>true</internal_replication>
              <replica> <!--该分片的第一个副本-->
                  <host>node03</host>
                  <port>9000</port>
              </replica>
              <replica> <!--该分片的第二个副本-->
                  <host>node04</host>
                  <port>9000</port>
              </replica>
          </shard>
          <shard> <!--集群的第三个分片-->
              <internal_replication>true</internal_replication>
              <replica> <!--该分片的第一个副本-->
                  <host>node05</host>
                  <port>9000</port>
              </replica>
              <replica> <!--该分片的第二个副本-->
                  <host>node06</host>
                  <port>9000</port>
              </replica>
          </shard>
      </fz_cluster>
  </remote_servers>
</yandex>

4 .配置三节点版本集群及副本

4.1 集群及副本规划(2 个分片,只有第一个分片有副本)

img


4.2 配置步骤

1)在 Node01 的/etc/clickhouse-server/config.d 目录下创建 metrika-shard.xml 文件

vim /etc/clickhouse-server/config.d/metrika-shard.xml

注:也可以不创建外部文件,直接在 config.xml 的<remote_servers>中指定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?xml version="1.0"?>
<yandex>
    <remote_servers>
        <gmall_cluster> <!-- 集群名称-->
            <shard> <!--集群的第一个分片-->
                <internal_replication>true</internal_replication>
                <replica> <!--该分片的第一个副本-->
                    <host>Node01</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>1234qwer</password>
                </replica>
                <replica> <!--该分片的第二个副本-->
                    <host>Node02</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>1234qwer</password>
                </replica>
            </shard>
            <shard> <!--集群的第二个分片-->
                <internal_replication>true</internal_replication>
                <replica> <!--该分片的第一个副本-->
                    <host>Node03</host>
                    <port>9000</port>
                    <user>default</user>
                    <password>1234qwer</password>
                </replica>
            </shard>
        </gmall_cluster>
    </remote_servers>
    
    <zookeeper-servers>
        <node index="1">
            <host>Node01</host>
            <port>2181</port>
        </node>
        <node index="2">
            <host>Node02</host>
            <port>2181</port>
        </node>
        <node index="3">
            <host>Node03</host>
            <port>2181</port>
        </node>
    </zookeeper-servers>
    <macros>
        <shard>01</shard> <!--不同机器放的分片数不一样-->
        <replica>rep_1_1</replica> <!--不同机器放的副本数不一样-->
    </macros>
</yandex>

2)将 Node01 的 metrika-shard.xml 同步到 Node02 和 Node03

1
2
scp /etc/clickhouse-server/config.d/metrika-shard.xml root@Node02:/etc/clickhouse-server/config.d/
scp /etc/clickhouse-server/config.d/metrika-shard.xml root@Node03:/etc/clickhouse-server/config.d/

3)修改 Node02 和 Node03 中 metrika-shard.xml 宏的配置

(1)Node02

1
2
3
4
<macros>
      <shard>01</shard> <!--不同机器放的分片数不一样-->
      <replica>rep_1_2</replica> <!--不同机器放的副本数不一样-->
</macros>

(2)Node03

1
2
3
4
<macros>
      <shard>02</shard> <!--不同机器放的分片数不一样-->
      <replica>rep_2_1</replica> <!--不同机器放的副本数不一样-->
</macros>

4)在 Node01 上修改/etc/clickhouse-server/config.xml

1
2
3
4
 vim /etc/clickhouse-server/config.xml 

<zookeeper incl="zookeeper-servers" optional="true" />
<include_from>/etc/clickhouse-server/config.d/metrika-shard.xml</include_from>

5)同步/etc/clickhouse-server/config.xml 到 Node02 和 Node03

1
2
scp /etc/clickhouse-server/config.xml root@Node02:/etc/clickhouse-server/
scp /etc/clickhouse-server/config.xml root@Node03:/etc/clickhouse-server/

6)重启三台服务器上的 ClickHouse 服务

sudo clickhouse restart


查看集群

1
2
3
4
5
6
7
8
9
10
11
12
13
superset-BI :) show clusters;
SHOW CLUSTERS
Query id: 391735d2-bf74-43f5-aa86-b6d203c357cd
┌─cluster─────────────────────────────────────────┐
│ gmall_cluster                                   │
│ test_cluster_one_shard_three_replicas_localhost │
│ test_cluster_two_shards                         │
│ test_cluster_two_shards_internal_replication   │
│ test_cluster_two_shards_localhost               │
│ test_shard_localhost                           │
│ test_shard_localhost_secure                     │
│ test_unavailable_shard                         │
└─────────────────────────────────────────────────┘8 rows in set. Elapsed: 0.002 sec.

7)在 Node01 上执行建表语句

➢ 会自动同步到 Node02 和 Node03 上

➢ 集群名字要和配置文件中的一致

➢ 分片和副本名称从配置文件的宏定义中获取

1
2
3
4
5
6
7
8
9
10
create table st_fz_order_mt_01 on cluster gmall_cluster (
id UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine
=ReplicatedMergeTree('/clickhouse/tables/{shard}/st_fz_order_mt_01','{replica}')
partition by toYYYYMMDD(create_time)
primary key (id)
order by (id,sku_id);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
superset-BI :) create table st_fz_order_mt_01 on cluster gmall_cluster (
               id UInt32,
               sku_id String,
               total_amount Decimal(16,2),
               create_time Datetime
               ) engine
               =ReplicatedMergeTree('/clickhouse/tables/{shard}/st_fz_order_mt_01','{replica}')
               partition by toYYYYMMDD(create_time)
               primary key (id)
               order by (id,sku_id);
CREATE TABLE st_fz_order_mt_01 ON CLUSTER gmall_cluster
(
    `id` UInt32,
    `sku_id` String,
    `total_amount` Decimal(16, 2),
    `create_time` Datetime
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/st_fz_order_mt_01', '{replica}')
PARTITION BY toYYYYMMDD(create_time)
PRIMARY KEY id
ORDER BY (id, sku_id)
Query id: b7818894-861b-443c-86e7-41832716eb34
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ Node1 │ 9000 │      0 │       │                   2 │                2
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ Node2 │ 9000 │      0 │       │                   1 │                0
│ Node3 │ 9000 │      0 │       │                   0 │                0
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
3 rows in set. Elapsed: 0.166 sec.

等三个节点都完事儿即可。


在Node02和Node03上查看表是否创建成功

show tables;


8)在 Node02 上创建 Distribute 分布式表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
date1002 :) create table st_fz_order_mt_all2 on cluster gmall_cluster
            (
            id UInt32,
            sku_id String,
            total_amount Decimal(16,2),
            create_time Datetime
            )engine = Distributed(gmall_cluster,default, st_fz_order_mt_01,hiveHash(sku_id));
CREATE TABLE st_fz_order_mt_all2 ON CLUSTER gmall_cluster
(
    `id` UInt32,
    `sku_id` String,
    `total_amount` Decimal(16, 2),
    `create_time` Datetime
)
ENGINE = Distributed(gmall_cluster, default, st_fz_order_mt_01, hiveHash(sku_id))
Query id: e447cdff-133f-4159-99bd-038d573ce8c8
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ Node2 │ 9000 │      0 │       │                   2 │                0
│ Node1 │ 9000 │      0 │       │                   1 │                0
│ Node3 │ 9000 │      0 │       │                   0 │                0
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
3 rows in set. Elapsed: 0.131 sec.

参数含义:

Distributed(集群名称,库名,本地表名,分片键)

分片键必须是整型数字,所以用 hiveHash 函数转换,也可以 rand()


9)在 Node01 上插入测试数据

1
2
3
4
5
6
insert into st_order_mt_all2 values
(201,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(202,'sku_002',2000.00,'2020-06-01 12:00:00'),
(203,'sku_004',2500.00,'2020-06-01 12:00:00'),
(204,'sku_002',2000.00,'2020-06-01 12:00:00'),
(205,'sku_003',600.00,'2020-06-02 12:00:00');

10)通过查询分布式表和本地表观察输出结果

(1)分布式表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
superset-BI :) select * From st_fz_order_mt_all2;
SELECT *
FROM st_fz_order_mt_all2
Query id: d8b676e9-c119-4483-8ca2-f0b5cd150a61
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
202 │ sku_002 │         20002020-06-01 12:00:00
203 │ sku_004 │         25002020-06-01 12:00:00
204 │ sku_002 │         20002020-06-01 12:00:00
└─────┴─────────┴──────────────┴─────────────────────┘
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
205 │ sku_003 │         6002020-06-02 12:00:00
└─────┴─────────┴──────────────┴─────────────────────┘
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
201 │ sku_001 │         10002020-06-01 12:00:00
└─────┴─────────┴──────────────┴─────────────────────┘

(2)本地表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Node1:
superset-BI :) select * From st_fz_order_mt_01;
SELECT *
FROM st_fz_order_mt_01
Query id: ddcb5176-e443-4253-9877-57fec8f57311
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
202 │ sku_002 │         20002020-06-01 12:00:00
203 │ sku_004 │         25002020-06-01 12:00:00
204 │ sku_002 │         20002020-06-01 12:00:00
└─────┴─────────┴──────────────┴─────────────────────┘
3 rows in set. Elapsed: 0.002 sec.

Node2:

Node3:
date1001 :) select * From st_fz_order_mt_01;
SELECT *
FROM st_fz_order_mt_01
Query id: 7a336004-7040-4098-948e-1e7c5d983edb
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
205 │ sku_003 │         6002020-06-02 12:00:00
└─────┴─────────┴──────────────┴─────────────────────┘
┌──id─┬─sku_id──┬─total_amount─┬─────────create_time─┐
201 │ sku_001 │         10002020-06-01 12:00:00
└─────┴─────────┴──────────────┴─────────────────────┘
2 rows in set. Elapsed: 0.002 sec.

可以看到数据分布在Node1和Node3两个节点上。

5. ClickHouse到底改写本地表还是分布式表

如果预估自己的业务数据量不大(日增不到百万行), 那么写分布式表和本地表都可以, 但要注意如果选择写本地表, 请保证每次写入数据都建立新的连接, 且每个连接写入的数据量基本相同

如果预估自己的业务数据量大(日增百万以上, 并发插入大于10), 那么请写本地表

建议每次插入50W行左右数据, 最多不可超过100W行. 总之CH不像MySQL要小事务. 比如1000W行数据, MySQL建议一次插入1W左右, 使用小事务, 执行1000次. CH建议20次,每次50W. 这是MergeTree引擎原理决定的, 频繁少量插入会导致data part过多, 合并不过来.

再有, AP不像TP, TP为了避免建立新连接产生的损耗影响性能, 通常会使用长连接, 连接池等技术做优化. 但AP业务不需要, 因为AP的属性就不会有高并发, 小SQL.

原因请继续看:

看了一些文档都建议写分布式表, 虽说别人没必要骗我们, 但是搞技术的不能人云亦云, 还是要明白为啥, 说实话我想了很久没想出直接写分布式表有什么致命缺陷, 于是在ClickHouse中文社区提了问题, 内容如下

看了sina高鹏大佬的分享,看了https://github.com/ClickHouse/ClickHouse/issues/1854 ,还看了一些文章都是建议写本地表而不是分布式表**
如果我设置 internal_replication=true , 使用 ReplicatedMergeTree 引擎, 除了写本地表更灵活可控外, 写分布式表到底有什么致命缺陷吗?
因为要给同事解释, 只说一个大家说最佳实践是这样是不行的… 我自己也没理解到底写分布式表有啥大缺陷
如果说造成数据分布不均匀, sharding key 我设为 rand() 还会有很大不均匀吗? 如果说扩容, 我也可以通过调整 weight 控制数据尽量写入新 shared 啊?

难道是因为:

Data is written asynchronously. When inserted in the table, the data block is just written to the local file system. The data is sent to the remote servers in the background as soon as possible. The period for sending data is managed by the distributed_directory_monitor_sleep_time_ms and distributed_directory_monitor_max_sleep_time_ms settings. The Distributed engine sends each file with inserted data separately, but you can enable batch sending of files with the distributed_directory_monitor_batch_inserts setting. This setting improves cluster performance by better utilizing local server and network resources. You should check whether data is sent successfully by checking the list of files (data waiting to be sent) in the table directory: /var/lib/clickhouse/data/database/table/.

If the server ceased to exist or had a rough restart (for example, after a device failure) after an INSERT to a Distributed table, the inserted data might be lost. If a damaged data part is detected in the table directory, it is transferred to the ‘broken’ subdirectory and no longer used.

上面文档内容我理解意思是说假如我有S1 S2 S3 三个节点,每个节点都有local表和分布式表. 我向S1的分布式表写数据1, 2, 3,
1写入S1, 2,3先写到S1本地文件系统, 然后异步发送到S2 S3 , 比如2发给S2, 3发给S3, 如果此时S3宕机了, 则3发到S3失败, 但是1,2还是成功写到S1,S2了? 所以整个过程不能保证原子性? 出现问题还要人为修数据?

https://github.com/ClickHouse/ClickHouse/issues/1343 这个issue说S3 come back后S1会尝试重新发送数据给S3.

Data blocks are written in /var/lib/clickhouse/data/database/table/ folder. Special thread checks directory periodically and tries to send data. If it can’t, it will try next time.

那么只剩文档最后一句意思是如果S1过程中宕机, 会丢数据?
自问自答一下吧, weight 是分片级别的, 不是表级别的, 灵活性差

问了下新浪的高鹏

高鹏的回答总结一下就是:

  1. 新浪每天增量是千亿级的, INSERT并发和节点数应该比较高, 直接写某个节点的分布式表, 这个节点还需要建立N-1个连接(N是集群节点数)分发数据, 再有就是他说的第3点
  2. 通过负载均衡向本地表插入数据要控制尽量每次插入数据建立一次连接, 每个链接插入的数据量要差不多, batch size不能太小, 否则data part过多, merge不过来clickhouse会报错

问题:

  1. 大佬,请问一下如果是写本地表,怎么进行hash的操作,让相同的主键插入到相同的节点呢(为了join和in关联时提高效率)
    那感觉得在应用层做了, 做hash后得到节点ip, 连接这个节点插入数据. 我这边是通过域名, 域名对应所有节点ip, 轮询的, 所以做不到你这种需求, 我看chproxy貌似也不行
  2. 补充两点:
    1、Distributed表在写入时会在本地节点生成临时数据,会产生写放大,所以会对CPU及内存造成一些额外消耗,建议尽量少使用Distributed表进行写操作;
    2、Distributed表写的临时block会把原始block根据sharding_key和weight进行再次拆分,会产生更多的block分发到远端节点,也增加了merge的负担;