GBase 8a从Kafka的Topic加载数据使用样例

GBase 8a支持从Kafka 加载数据,本文介绍该功能的使用方式并提供样例。GBase 8a当前支持单次加载,以及以类似定时任务的形式自动加载。

环境

kafka版本

kafka_2.13-2.8.0

数据

原始topic一共两行数据, 名字为gbase8a,主机名为rh_210,对应IP是10.0.2.210.

[root@localhost kafka]# bin/kafka-console-consumer.sh --topic gbase8a --from-beginning --bootstrap-server localhost:9092
1234,First
5678,Second
^CProcessed a total of 2 messages
[root@localhost kafka]#
[root@localhost kafka]# bin/kafka-console-consumer.sh --topic gbase8a --from-beginning --bootstrap-server localhost:9092
1234,First
5678,Second
^CProcessed a total of 2 messages

操作系统

CentOS 7.9

单次加载样例

指通过LOAD语句,从kafka加载一次数据的方法,针对kafka的流式数据,后面有自动定时加载的方法。

语法

如下只给出协议数据源部分,完整的样例看后面。 和加载有关的完整语法请参考 GBase 8a 集群加载数据LOAD的方法 https://www.gbase8.cn/121

kafka://broker/topic[?[duration=XX][&][partition=partitionid|offset][#frombeginning]
  • broker:kafka的IP和端口,比如10.0.2.201:9092
  • topic:kafka的数据源的topic名字,注意不要有横线,只包含常见的字母数字和下划线,不要有特殊字符,横线等。
  • 参数部分以问号开始,多个参数间用&分割
    • duration:获取数据提交间隔。当达到该时间后,将提交这部分数据,保存到数据库。一般加载建议,30-300秒都是合适的。太短的间隔会导致数据库磁盘负载增加。
    • partition:kafka里的分区。
      • partitionid 分区编号
      • offset:偏移量。 最初的数据从0开始,但存在老化删除的情况,可以用 #frombeginning从头开始
  • #frombeginning:整个topic从头开始。注意不一定是0,因为有老化。也请注意这个参数和partition的区别,那个是指定某个partition从头开始,这个用来指定整个topic从头开始。不能出现2次,因为从语义上是冲突的。

Kafka单次加载进度元数据表

在gclusterdb库下面,保存了和该表Kafka加载进度的元数据。注意不是每个用户都有权限查看gclusterdb库的。表的命名规则是

topic名字_库名_表名,比如 gbase8a_testdb_tt1,表示topic是gbase8a, 库是testdb,表是tt1。样例看后面的章节。

列名类型说明
scnbigint(20)SCN号
partition_offsetvarchar(2048)偏移量,包括分区和offset
commit_timetimestamp提交的时间戳

警告:

如果一个表,从多个kafka数据源加载,且存在了重复的topic 名字,会出现该元数据表的数据混乱。 所以建议在命名kafka里topic名字时,能加上主机名或IP。

样例

加载整个topic

其中duration设置了10秒,从头开始加载。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=10000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:10.70)
Task 28673 finished, Loaded 2 records, Skipped 0 records

gbase> select * from tt1;
+------+--------+
| id   | name   |
+------+--------+
|    1 | First  |
|    2 | Second |
+------+--------+
2 rows in set (Elapsed: 00:00:00.01)

不同duration的效果

如下分别设置1秒,2秒,5秒的的duration, 数据都是全部加载,耗时与duration有明显关系。内部是要在消费指定时间的数据后,再做入库提交commit动作的。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:01.18)
Task 28674 finished, Loaded 2 records, Skipped 0 records

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:02.20)
Task 28675 finished, Loaded 2 records, Skipped 0 records

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=5000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:05.25)
Task 28676 finished, Loaded 2 records, Skipped 0 records

不同partition的效果

由于测试的topic没有分区,所以指定partition=1时没有数据,而0时有2行。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=1|0' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:02.21)
Task 28678 finished, Loaded 0 records, Skipped 0 records

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=0|0' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:02.28)
Task 28679 finished, Loaded 2 records, Skipped 0 records

指定不同offset的效果

指定1的offset发现只有1行数据入库,而0的有2行。此处的offset很关键,如果设置错了会导致丢数据或者重复数据。如果不确认,建议使用数据库自带的元数据保存功能,会自动保存已经加载的offset。详情看前面的元数据部分。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=0|0' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:02.28)
Task 28679 finished, Loaded 2 records, Skipped 0 records

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=0|1' into table testdb.tt1;
Query OK, 1 row affected (Elapsed: 00:00:02.22)
Task 28680 finished, Loaded 1 records, Skipped 0 records

使用Kafka加载进度元数据自动加载offset

在加载一次后,已经将当前加载进度保存到元数据。如果不强行指定offset则不会重复消费。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.29)
Task 28683 finished, Loaded 0 records, Skipped 0 records

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.14)
Task 28684 finished, Loaded 0 records, Skipped 0 records
尝试产生一些新数据
[root@rh7_210 kafka]# bin/kafka-console-producer.sh --topic gbase8a --bootstrap-server localhost:9092
>3,Third
>
再次加载

可以加载新产生的数据。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 1 row affected (Elapsed: 00:00:01.23)
Task 28685 finished, Loaded 1 records, Skipped 0 records

gbase> select * from tt1;
+------+--------+
| id   | name   |
+------+--------+
|    1 | First  |
|    2 | Second |
|    2 | Second |
|    2 | Second |
|    1 | First  |
|    2 | Second |
|    3 | Third  |
+------+--------+
7 rows in set (Elapsed: 00:00:00.01)

查看Kafka加载进度元数据记录

可以看到每次加载的时间,偏移量等信息。注意正常使用时,不要将该表删除,否则会出现重复加载。后面会测试。

gbase> use gclusterdb;
Query OK, 0 rows affected (Elapsed: 00:00:00.01)

gbase> select * from gbase8a_testdb_tt1;
+-------+------------------+---------------------+
| scn   | partition_offset | commit_time         |
+-------+------------------+---------------------+
| 28673 | 0:2              | 2021-06-10 11:53:39 |
| 28674 | 0:2              | 2021-06-10 11:57:08 |
| 28675 | 0:2              | 2021-06-10 11:57:17 |
| 28676 | 0:2              | 2021-06-10 11:57:26 |
| 28678 | 1:0              | 2021-06-10 11:58:59 |
| 28679 | 0:2              | 2021-06-10 11:59:08 |
| 28680 | 0:2              | 2021-06-10 11:59:24 |
| 28681 | 0:2              | 2021-06-10 12:00:19 |
| 28682 | 0:2              | 2021-06-10 12:05:02 |
| 28685 | 0:3              | 2021-06-10 12:06:36 |
+-------+------------------+---------------------+
10 rows in set (Elapsed: 00:00:00.01)

尝试删除Kafka加载进度元数据表的后果

如果删除了,则进度记录丢失,会导致从0开始重新加载。同时生成了一个新的Kafka加载进度元数据表和1行记录。

gbase> drop table gbase8a_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:00.40)

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 3 rows affected (Elapsed: 00:00:01.32)
Task 28689 finished, Loaded 3 records, Skipped 0 records

gbase> select * from gbase8a_testdb_tt1;
+-------+------------------+---------------------+
| scn   | partition_offset | commit_time         |
+-------+------------------+---------------------+
| 28689 | 0:3              | 2021-06-10 12:13:11 |
+-------+------------------+---------------------+
1 row in set (Elapsed: 00:00:00.01)

加载指定partition的起始位置

partition 指定分区,后面的偏移必须提供参数。可以指定具体的量,也可以用#frombeginning从头开始。 注意不一定是0, 因为数据会老化删除。

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=0' into table testdb.tt1;
ERROR 1733 (HY000): (GBA-01EX-700) Gbase general error: partition format error
gbase>
gbase>
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=0#frombeginning' into table testdb.tt1;
ERROR 1733 (HY000): (GBA-01EX-700) Gbase general error: partition format error
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=0|#frombeginning' into table testdb.tt1;
Query OK, 3 rows affected (Elapsed: 00:00:01.41)
Task 28691 finished, Loaded 3 records, Skipped 0 records

gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=1|#frombeginning' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.21)
Task 28692 finished, Loaded 0 records, Skipped 0 records

定时加载Kafka数据

GBase 8a支持通过consumer任务定时加载kafka数据。

语法

创建Kafka 定时加载的消费者

create kafka consumer CONSUMER_NAME loader topic TOPIC_NAME brokers  'BROKER[,BROKERn..]' duration DURATION_TIME_MS into table [DBNAME.]TABLENAME;
  • CONSUMER_NAME: 本次consumer的名字。
  • TOPIC_NAME : kafka里topic的名字
  • BROKER: Kafka所在地址,如果有多个,用逗号分割
  • DURATION_TIME_MS : 消费Kafka数据间隔,单位是毫秒。建议30-300秒以上。
  • DBNAME : 库名
  • TABLENAME 表名

启动Kafka 定时加载的消费者

start kafka consumer CONSUMER_NAME ;

CONSUMER_NAME: Kafka加载消费的consumer的名字。

查看Kafka消费者状态

show kafka consumer CONSUMER_NAME;

删除Kafka消费者

drop kafka consumer CONSUMER_NAME;

定时加载Kafka数据的元数据表

记录了当前的加载进度。kafka_load_DBNAME_TABLENAME_DBNAME_TABLENAME。 其中库名表名出现了2次,尚不清楚原因。

列名类型说明
scnbigint(20)SCN号
partition_offsetvarchar(2048)偏移量,包括分区和offset
commit_timetimestamp提交的时间戳

定时加载Kafka数据样例

创建Kafka加载consumer

gbase> create kafka consumer kafka_load_testdb_tt1 loader topic gbase8a brokers  '10.0.2.210:9092' duration 3000 into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.52)
create consumer done.

启动Kafka加载consumer

gbase> stop kafka consumer kafka_load_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.89)
stop consumer done.

gbase>
gbase>
gbase> start kafka consumer kafka_load_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:02.43)
start consumer done.

查看数据并测试产生新数据

gbase> select * from tt1;
+------+--------+
| id   | name   |
+------+--------+
|    1 | First  |
|    2 | Second |
|    3 | Third  |
+------+--------+
3 rows in set (Elapsed: 00:00:00.01)

产生新数据

[root@rh7_210 kafka]# bin/kafka-console-producer.sh --topic gbase8a --bootstrap-server localhost:9092
>4,Four
>5,Five
>^C[root@rh7_210 kafka]

再次查看,自动加载了新的Kafka里的topic的数据。

gbase> select * from tt1;
+------+--------+
| id   | name   |
+------+--------+
|    1 | First  |
|    2 | Second |
|    3 | Third  |
|    4 | Four   |
|    5 | Five   |
+------+--------+
5 rows in set (Elapsed: 00:00:00.00)

再次测试

[root@rh7_210 kafka]# bin/kafka-console-producer.sh --topic gbase8a --bootstrap-server localhost:9092
>6,Six
>7,seven
>8,eight
>^C[root@rh7_210 kafka]#

gbase> select * from tt1;
+------+--------+
| id   | name   |
+------+--------+
|    1 | First  |
|    2 | Second |
|    3 | Third  |
|    4 | Four   |
|    5 | Five   |
|    6 | Six    |
|    7 | seven  |
|    8 | eight  |
+------+--------+
8 rows in set (Elapsed: 00:00:00.01)

查看Kafka定时加载进度元数据表

从内部看到每次加载的偏移量,以及提交时间。

gbase> select * from kafka_load_testdb_tt1_testdb_tt1;
+-------+------------------+---------------------+
| scn   | partition_offset | commit_time         |
+-------+------------------+---------------------+
|     0 | 0:-2             | 2021-06-10 13:46:04 |
| 29699 | 0:3              | 2021-06-10 13:56:45 |
| 29722 | 0:4              | 2021-06-10 13:58:27 |
| 29723 | 0:5              | 2021-06-10 13:58:30 |
| 29729 | 0:6              | 2021-06-10 13:58:51 |
| 29731 | 0:7              | 2021-06-10 13:58:58 |
| 29732 | 0:8              | 2021-06-10 13:59:02 |
+-------+------------------+---------------------+
7 rows in set (Elapsed: 00:00:00.01)

查看消费者状态

gbase> show kafka consumer kafka_load_testdb_tt1;
+-----------------------+------+--------+-------+---------+-----------------+------------+----------+----------------+--------+
| NAME                  | TYPE | DB     | TABLE | TOPIC   | BROKERS         | PARTITIONS | DURATION | LOADER_OPTIONS | STATUS |
+-----------------------+------+--------+-------+---------+-----------------+------------+----------+----------------+--------+
| kafka_load_testdb_tt1 | L    | testdb | tt1   | gbase8a | 10.0.2.210:9092 | ALL        | 3000     |  DATA_FORMAT 3 | start  |
+-----------------------+------+--------+-------+---------+-----------------+------------+----------+----------------+--------+
1 row in set (Elapsed: 00:00:00.22)
  • NAME : 消费者名字
  • TYPE : L = LOAD , 没测试过OGG, 估计另外一个是O, 代表从Oracel的同步
  • DB : 库名
  • TABLE : 表名
  • TOPIC : kafka的topic名字
  • BROKERS : Kafka地址
  • PARTITIONS : 分区
  • DURATION : 消费间隔
  • LOADER_OPTIONS : 加载额外参数
  • STATUS : 当前状态

删除消费者

gbase> drop kafka consumer kafka_load_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:00.02)