Flink Catalog介绍

1.FlinkCatalog介绍

1.18.3.1.引言

以下转自:http://legendtkl.com/2020/07/26/flink-catalog/

这篇文章我们介绍了一下Flink的Catalog,基于Flink1.11,熟悉Flink或者Spark等大数据引擎的同学应该都知道这两个计算引擎都有一个共同的组件叫Catalog。下面是Flink的Catalog的官方定义。

1
2
3
4
5
6
Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。
元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF。
元数据也可以是持久化的,例如HiveMetastore中的元数据。
Catalog提供了一个统一的API,用于管理元数据,并使其可以从TableAPI和SQL查询语句中来访问。

简单来说,Catalog就是元数据管理中心,其中元数据包括数据库、表、表结构等信息。

1.18.3.2.Catalog定义

Flink的Catalog相关代码定义在catalog.java文件中,是一个interface,如下。

1
2
3
4
5
6
7
8
/**
*Thisinterfaceisresponsibleforreadingandwritingmetadatasuchasdatabase/table/views/UDFs
*fromaregisteredcatalog.ItconnectsaregisteredcatalogandFlink'sTableAPI.
*/
@PublicEvolving
publicinterfaceCatalog{
...
}

既然是interface,我们来看一下支持的操作。

在这里插入图片描述

我们可以将这些接口做一个简单的分类。

  1. Database相关操作
    getDefaultDataBase:获取默认的database
    getDatabase:获取特定的database
    listDatabases:列出所有的database
    databaseExists:判断database是否存在
    createDatabases:创建database
    dropDatabases:删除database
    alterDatabases:修改database
  2. Table相关操作,一般都会有个参数是database
    listTables:列出所有的table和view
    getTable:获取指定的table或者view
    tableExist:判断table或者view是否存在
    dropTable:删除table或者view
    createTable:创建table或者view
    renameTable:重命名table或者view
    alterTable:修改table或者view
  3. View相关操作,除了和table共用方法外,还有一个独有的方法。
    listViews:列出所有的view
  4. Partition相关操作,partition是table的一个属性,所以参数一般都会带有table信息。
    listPartition:列出table的所有partition
    getPartition:获取指定的partition
    partitionExist:判断parition是否存在
    createPartition:创建partition
    dropPartition:删除partition
    alterPartition:修改parition
  5. Function相关操作,这里的function知道的是用户自定义的function,也就是Udf。
    listFunctions:列出所有的function
    getFunction:获取指定的func
    functionExist:判断function是否存在
    dropFunction:删除function
    alterFunction:修改function

1.18.3.3.Catalog的实现

在这里插入图片描述

从上图我们可以看到Catalog的最终实现有三个类:
HiveCatalog:使用Hive的元数据来作为Flink的HiveCatalog
GenericInMemoryCatalog:使用内存实现Catalog
JdbcCatalog:使用其他支持jdbc协议的关系型数据库来存储元数据
PostgresCatalog:使用Postgres数据库来作为Catalog存储元数据

1.18.3.4.Catalog使用举例

下面的示例是FlinkSQL使用Catalog的示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TableEnvironmenttableEnv=...

//CreateaHiveCatalog
Catalogcatalog=newHiveCatalog("myhive",null,"<path_of_hive_conf>","<hive_version>");

//Registerthecatalog
tableEnv.registerCatalog("myhive",catalog);

//Createacatalogdatabase
tableEnv.executeSql("CREATEDATABASEmydbWITH(...)");

//Createacatalogtable
tableEnv.executeSql("CREATETABLEmytable(nameSTRING,ageINT)WITH(...)");

tableEnv.listTables();//shouldreturnthetablesincurrentcataloganddatabase.

下面是api的方式来使用Catalog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
importorg.apache.flink.table.api.*;
importorg.apache.flink.table.catalog.*;
importorg.apache.flink.table.catalog.hive.HiveCatalog;
importorg.apache.flink.table.descriptors.Kafka;

TableEnvironmenttableEnv=TableEnvironment.create(EnvironmentSettings.newInstance().build());

//CreateaHiveCatalog
Catalogcatalog=newHiveCatalog("myhive",null,"<path_of_hive_conf>","<hive_version>");

//Registerthecatalog
tableEnv.registerCatalog("myhive",catalog);

//Createacatalogdatabase
catalog.createDatabase("mydb",newCatalogDatabaseImpl(...));

//Createacatalogtable
TableSchemaschema=TableSchema.builder()
.field("name",DataTypes.STRING())
.field("age",DataTypes.INT())
.build();

catalog.createTable(
newObjectPath("mydb","mytable"),
newCatalogTableImpl(
schema,
newKafka()
.version("0.11")
....
.startFromEarlist()
.toProperties(),
"mycomment"
),
false
);

List<String>tables=catalog.listTables("mydb");//tablesshouldcontain"mytable"

1.18.3.5.自定义Catalog

Catalog是可扩展的,用户可以通过实现Catalog接口来开发自定义Catalog。想要在SQLCLI中使用自定义Catalog,用户除了需要实现自定义的Catalog之外,还需要为这个Catalog实现对应的CatalogFactory接口。

CatalogFactory定义了一组属性,用于SQLCLI启动时配置Catalog。这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到CatalogFactory并初始化相应的Catalog实例。

1.18.3.6.总结

这篇文章写的比较简单,相当于自己的学习笔记,下一篇文章我们比较一下Spark的Catalog实现。

2.AlinkCatalog