GBase 8a支持从Kafka 加载数据,本文介绍该功能的使用方式并提供样例。GBase 8a当前支持单次加载,以及以类似定时任务的形式自动加载。
目录导航
环境
kafka版本
kafka_2.13-2.8.0
数据
原始topic一共两行数据, 名字为gbase8a,主机名为rh_210,对应IP是10.0.2.210.
[root@localhost kafka]# bin/kafka-console-consumer.sh --topic gbase8a --from-beginning --bootstrap-server localhost:9092
1234,First
5678,Second
^CProcessed a total of 2 messages
[root@localhost kafka]#
[root@localhost kafka]# bin/kafka-console-consumer.sh --topic gbase8a --from-beginning --bootstrap-server localhost:9092
1234,First
5678,Second
^CProcessed a total of 2 messages
操作系统
CentOS 7.9
单次加载样例
指通过LOAD语句,从kafka加载一次数据的方法,针对kafka的流式数据,后面有自动定时加载的方法。
语法
如下只给出协议数据源部分,完整的样例看后面。 和加载有关的完整语法请参考 GBase 8a 集群加载数据LOAD的方法 https://www.gbase8.cn/121
kafka://broker/topic[?[duration=XX][&][partition=partitionid|offset][#frombeginning]
- broker:kafka的IP和端口,比如10.0.2.201:9092
- topic:kafka的数据源的topic名字,注意不要有横线,只包含常见的字母数字和下划线,不要有特殊字符,横线等。
- 参数部分以问号开始,多个参数间用&分割
- duration:获取数据提交间隔。当达到该时间后,将提交这部分数据,保存到数据库。一般加载建议,30-300秒都是合适的。太短的间隔会导致数据库磁盘负载增加。
- partition:kafka里的分区。
- partitionid 分区编号
- offset:偏移量。 最初的数据从0开始,但存在老化删除的情况,可以用 #frombeginning从头开始
- #frombeginning:整个topic从头开始。注意不一定是0,因为有老化。也请注意这个参数和partition的区别,那个是指定某个partition从头开始,这个用来指定整个topic从头开始。不能出现2次,因为从语义上是冲突的。
Kafka单次加载进度元数据表
在gclusterdb库下面,保存了和该表Kafka加载进度的元数据。注意不是每个用户都有权限查看gclusterdb库的。表的命名规则是
topic名字_库名_表名,比如 gbase8a_testdb_tt1,表示topic是gbase8a, 库是testdb,表是tt1。样例看后面的章节。
列名 | 类型 | 说明 |
---|---|---|
scn | bigint(20) | SCN号 |
partition_offset | varchar(2048) | 偏移量,包括分区和offset |
commit_time | timestamp | 提交的时间戳 |
警告:
如果一个表,从多个kafka数据源加载,且存在了重复的topic 名字,会出现该元数据表的数据混乱。 所以建议在命名kafka里topic名字时,能加上主机名或IP。
样例
加载整个topic
其中duration设置了10秒,从头开始加载。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=10000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:10.70)
Task 28673 finished, Loaded 2 records, Skipped 0 records
gbase> select * from tt1;
+------+--------+
| id | name |
+------+--------+
| 1 | First |
| 2 | Second |
+------+--------+
2 rows in set (Elapsed: 00:00:00.01)
不同duration的效果
如下分别设置1秒,2秒,5秒的的duration, 数据都是全部加载,耗时与duration有明显关系。内部是要在消费指定时间的数据后,再做入库提交commit动作的。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:01.18)
Task 28674 finished, Loaded 2 records, Skipped 0 records
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:02.20)
Task 28675 finished, Loaded 2 records, Skipped 0 records
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=5000#frombeginning' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:05.25)
Task 28676 finished, Loaded 2 records, Skipped 0 records
不同partition的效果
由于测试的topic没有分区,所以指定partition=1时没有数据,而0时有2行。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=1|0' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:02.21)
Task 28678 finished, Loaded 0 records, Skipped 0 records
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=0|0' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:02.28)
Task 28679 finished, Loaded 2 records, Skipped 0 records
指定不同offset的效果
指定1的offset发现只有1行数据入库,而0的有2行。此处的offset很关键,如果设置错了会导致丢数据或者重复数据。如果不确认,建议使用数据库自带的元数据保存功能,会自动保存已经加载的offset。详情看前面的元数据部分。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=0|0' into table testdb.tt1;
Query OK, 2 rows affected (Elapsed: 00:00:02.28)
Task 28679 finished, Loaded 2 records, Skipped 0 records
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=2000&partition=0|1' into table testdb.tt1;
Query OK, 1 row affected (Elapsed: 00:00:02.22)
Task 28680 finished, Loaded 1 records, Skipped 0 records
使用Kafka加载进度元数据自动加载offset
在加载一次后,已经将当前加载进度保存到元数据。如果不强行指定offset则不会重复消费。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.29)
Task 28683 finished, Loaded 0 records, Skipped 0 records
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.14)
Task 28684 finished, Loaded 0 records, Skipped 0 records
尝试产生一些新数据
[root@rh7_210 kafka]# bin/kafka-console-producer.sh --topic gbase8a --bootstrap-server localhost:9092
>3,Third
>
再次加载
可以加载新产生的数据。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 1 row affected (Elapsed: 00:00:01.23)
Task 28685 finished, Loaded 1 records, Skipped 0 records
gbase> select * from tt1;
+------+--------+
| id | name |
+------+--------+
| 1 | First |
| 2 | Second |
| 2 | Second |
| 2 | Second |
| 1 | First |
| 2 | Second |
| 3 | Third |
+------+--------+
7 rows in set (Elapsed: 00:00:00.01)
查看Kafka加载进度元数据记录
可以看到每次加载的时间,偏移量等信息。注意正常使用时,不要将该表删除,否则会出现重复加载。后面会测试。
gbase> use gclusterdb;
Query OK, 0 rows affected (Elapsed: 00:00:00.01)
gbase> select * from gbase8a_testdb_tt1;
+-------+------------------+---------------------+
| scn | partition_offset | commit_time |
+-------+------------------+---------------------+
| 28673 | 0:2 | 2021-06-10 11:53:39 |
| 28674 | 0:2 | 2021-06-10 11:57:08 |
| 28675 | 0:2 | 2021-06-10 11:57:17 |
| 28676 | 0:2 | 2021-06-10 11:57:26 |
| 28678 | 1:0 | 2021-06-10 11:58:59 |
| 28679 | 0:2 | 2021-06-10 11:59:08 |
| 28680 | 0:2 | 2021-06-10 11:59:24 |
| 28681 | 0:2 | 2021-06-10 12:00:19 |
| 28682 | 0:2 | 2021-06-10 12:05:02 |
| 28685 | 0:3 | 2021-06-10 12:06:36 |
+-------+------------------+---------------------+
10 rows in set (Elapsed: 00:00:00.01)
尝试删除Kafka加载进度元数据表的后果
如果删除了,则进度记录丢失,会导致从0开始重新加载。同时生成了一个新的Kafka加载进度元数据表和1行记录。
gbase> drop table gbase8a_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:00.40)
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000' into table testdb.tt1;
Query OK, 3 rows affected (Elapsed: 00:00:01.32)
Task 28689 finished, Loaded 3 records, Skipped 0 records
gbase> select * from gbase8a_testdb_tt1;
+-------+------------------+---------------------+
| scn | partition_offset | commit_time |
+-------+------------------+---------------------+
| 28689 | 0:3 | 2021-06-10 12:13:11 |
+-------+------------------+---------------------+
1 row in set (Elapsed: 00:00:00.01)
加载指定partition的起始位置
partition 指定分区,后面的偏移必须提供参数。可以指定具体的量,也可以用#frombeginning从头开始。 注意不一定是0, 因为数据会老化删除。
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=0' into table testdb.tt1;
ERROR 1733 (HY000): (GBA-01EX-700) Gbase general error: partition format error
gbase>
gbase>
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=0#frombeginning' into table testdb.tt1;
ERROR 1733 (HY000): (GBA-01EX-700) Gbase general error: partition format error
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=0|#frombeginning' into table testdb.tt1;
Query OK, 3 rows affected (Elapsed: 00:00:01.41)
Task 28691 finished, Loaded 3 records, Skipped 0 records
gbase> load data infile 'kafka://rh7_210:9092/gbase8a?duration=1000&partition=1|#frombeginning' into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.21)
Task 28692 finished, Loaded 0 records, Skipped 0 records
定时加载Kafka数据
GBase 8a支持通过consumer任务定时加载kafka数据。
语法
创建Kafka 定时加载的消费者
create kafka consumer CONSUMER_NAME loader topic TOPIC_NAME brokers 'BROKER[,BROKERn..]' duration DURATION_TIME_MS into table [DBNAME.]TABLENAME;
- CONSUMER_NAME: 本次consumer的名字。
- TOPIC_NAME : kafka里topic的名字
- BROKER: Kafka所在地址,如果有多个,用逗号分割
- DURATION_TIME_MS : 消费Kafka数据间隔,单位是毫秒。建议30-300秒以上。
- DBNAME : 库名
- TABLENAME 表名
启动Kafka 定时加载的消费者
start kafka consumer CONSUMER_NAME ;
CONSUMER_NAME: Kafka加载消费的consumer的名字。
查看Kafka消费者状态
show kafka consumer CONSUMER_NAME;
删除Kafka消费者
drop kafka consumer CONSUMER_NAME;
定时加载Kafka数据的元数据表
记录了当前的加载进度。kafka_load_DBNAME_TABLENAME_DBNAME_TABLENAME。 其中库名表名出现了2次,尚不清楚原因。
列名 | 类型 | 说明 |
---|---|---|
scn | bigint(20) | SCN号 |
partition_offset | varchar(2048) | 偏移量,包括分区和offset |
commit_time | timestamp | 提交的时间戳 |
定时加载Kafka数据样例
创建Kafka加载consumer
gbase> create kafka consumer kafka_load_testdb_tt1 loader topic gbase8a brokers '10.0.2.210:9092' duration 3000 into table testdb.tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.52)
create consumer done.
启动Kafka加载consumer
gbase> stop kafka consumer kafka_load_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:01.89)
stop consumer done.
gbase>
gbase>
gbase> start kafka consumer kafka_load_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:02.43)
start consumer done.
查看数据并测试产生新数据
gbase> select * from tt1;
+------+--------+
| id | name |
+------+--------+
| 1 | First |
| 2 | Second |
| 3 | Third |
+------+--------+
3 rows in set (Elapsed: 00:00:00.01)
产生新数据
[root@rh7_210 kafka]# bin/kafka-console-producer.sh --topic gbase8a --bootstrap-server localhost:9092
>4,Four
>5,Five
>^C[root@rh7_210 kafka]
再次查看,自动加载了新的Kafka里的topic的数据。
gbase> select * from tt1;
+------+--------+
| id | name |
+------+--------+
| 1 | First |
| 2 | Second |
| 3 | Third |
| 4 | Four |
| 5 | Five |
+------+--------+
5 rows in set (Elapsed: 00:00:00.00)
再次测试
[root@rh7_210 kafka]# bin/kafka-console-producer.sh --topic gbase8a --bootstrap-server localhost:9092
>6,Six
>7,seven
>8,eight
>^C[root@rh7_210 kafka]#
gbase> select * from tt1;
+------+--------+
| id | name |
+------+--------+
| 1 | First |
| 2 | Second |
| 3 | Third |
| 4 | Four |
| 5 | Five |
| 6 | Six |
| 7 | seven |
| 8 | eight |
+------+--------+
8 rows in set (Elapsed: 00:00:00.01)
查看Kafka定时加载进度元数据表
从内部看到每次加载的偏移量,以及提交时间。
gbase> select * from kafka_load_testdb_tt1_testdb_tt1;
+-------+------------------+---------------------+
| scn | partition_offset | commit_time |
+-------+------------------+---------------------+
| 0 | 0:-2 | 2021-06-10 13:46:04 |
| 29699 | 0:3 | 2021-06-10 13:56:45 |
| 29722 | 0:4 | 2021-06-10 13:58:27 |
| 29723 | 0:5 | 2021-06-10 13:58:30 |
| 29729 | 0:6 | 2021-06-10 13:58:51 |
| 29731 | 0:7 | 2021-06-10 13:58:58 |
| 29732 | 0:8 | 2021-06-10 13:59:02 |
+-------+------------------+---------------------+
7 rows in set (Elapsed: 00:00:00.01)
查看消费者状态
gbase> show kafka consumer kafka_load_testdb_tt1;
+-----------------------+------+--------+-------+---------+-----------------+------------+----------+----------------+--------+
| NAME | TYPE | DB | TABLE | TOPIC | BROKERS | PARTITIONS | DURATION | LOADER_OPTIONS | STATUS |
+-----------------------+------+--------+-------+---------+-----------------+------------+----------+----------------+--------+
| kafka_load_testdb_tt1 | L | testdb | tt1 | gbase8a | 10.0.2.210:9092 | ALL | 3000 | DATA_FORMAT 3 | start |
+-----------------------+------+--------+-------+---------+-----------------+------------+----------+----------------+--------+
1 row in set (Elapsed: 00:00:00.22)
- NAME : 消费者名字
- TYPE : L = LOAD , 没测试过OGG, 估计另外一个是O, 代表从Oracel的同步
- DB : 库名
- TABLE : 表名
- TOPIC : kafka的topic名字
- BROKERS : Kafka地址
- PARTITIONS : 分区
- DURATION : 消费间隔
- LOADER_OPTIONS : 加载额外参数
- STATUS : 当前状态
删除消费者
gbase> drop kafka consumer kafka_load_testdb_tt1;
Query OK, 0 rows affected (Elapsed: 00:00:00.02)