GBase 8a和OGG + Kafka同步相关参数介绍

本文介绍GBase 8a和OGG +Kafka相关参数,主要用于consume功能,与加载导出等无关。

gbase_8a_gcluster.cnf

9.5.2版本kafka参数列表

gbase> show variables like '%kaf%';
+----------------------------------------------------+-----------+
| Variable_name                                      | Value     |
+----------------------------------------------------+-----------+
| _gbase_kafka_transaction_mode                      | OFF       |
| _t_gcluster_kafka_allow_offset_jump                | 1         |
| _t_gcluster_kafka_consumer_compare_field_only_once | 0         |
| _t_gcluster_kafka_consumer_force_compare_field     | 1         |
| _t_gcluster_kafka_ignore_when_update_not_hit       | 0         |
| _t_gcluster_kafka_null_transform                   | 0         |
| _t_gcluster_kafka_seek_offset                      | 0         |
| _t_gcluster_kafka_trust_kafka_returned_offset      | 0         |
| _t_kafka_varchar_auto_truncate                     | 0         |
| gbase_kafka_broker_version                         |           |
| gbase_kafka_keytab                                 |           |
| gbase_kafka_principal                              |           |
| gcluster_kafka_batch_commit_dml_count              | 100000    |
| gcluster_kafka_consume_batch                       | 10        |
| gcluster_kafka_consume_timeout                     | 2000      |
| gcluster_kafka_consumer_escape_zero                | 0         |
| gcluster_kafka_consumer_latency_time_statistics    | 0         |
| gcluster_kafka_consumer_output_charset_name        |           |
| gcluster_kafka_consumer_special_use_charset_name   |           |
| gcluster_kafka_consumer_support_partial_update     | 0         |
| gcluster_kafka_data_buf_size                       | 0         |
| gcluster_kafka_dataflow                            | 0         |
| gcluster_kafka_debug_on                            | 0         |
| gcluster_kafka_delete_execute_directly             | 0         |
| gcluster_kafka_ignore_if_table_not_exist           | 0         |
| gcluster_kafka_ignore_pos_field                    | 0         |
| gcluster_kafka_loader_max_start_count              | 20        |
| gcluster_kafka_local_queue_size                    | 201000    |
| gcluster_kafka_max_message_size                    | 100000000 |
| gcluster_kafka_parallel_commit                     | 1         |
| gcluster_kafka_primarykey_can_be_null              | 0         |
| gcluster_kafka_result_check                        | 0         |
| gcluster_kafka_user_allowed_max_latency            | 10000     |
+----------------------------------------------------+-----------+
33 rows in set (Elapsed: 00:00:00.00)

gcluster_kafka_consumer_enable

打开 kafka consumer 功能,如果不打开,则 consumer 相关命令都不可用(报错)

  • 1 打开
  • 0 关闭, 默认

该参数在集群show 时看不到,但在默认的配置文件里能看到。

gcluster_assign_kafka_topic_period

自动接管 consumer 的时间周期,单位为秒。

例如 A 节点宕机了,最大需要等待 gcluster_assign_kafka_topic_period秒之后,A 节点负责的同步任务会被其他节点接管。最小值 20s,最大值 120s。

gcluster_kafka_max_message_size

从 kafka topic 获得消息的最大长度,
单位为字节,最大值 1000000000 字节,这个值需要大于等于 kafka server 的配置(message.max.bytes),否则可能造成消费问题,如果 kafka 队列中存在一条消息,其大小超过 gcluster_kafka_max_message_size 就会造成消费卡住。

gcluster_kafka_batch_commit_dml_count

一次提交 dml 操作的数量。

适当调大能明显提高性能,但是如果一个 topic 涉及的表很多(几百个表)则建议该参数调小,表越多越应该调小,调小的目的是使得一次提交命中的表少一些,具体需要结合具体用户场景、同步速度、资源占用情况具体对待。未来启用新事务后,表数量多对性能的影响会降低,会再次更新手册。需要注意的是,此参数是一个意向值,程序未必会严格按照此参数来提交,比如如果一个事务包含大量 DML 操作,那么程序必须确保事务完整性;再比如从 kafka 取消息、解析消息的速度慢于往单机提交数据的速度,那么程序也会选择先提交,而不是一定要等待满足 gcluster_kafka_batch_commit_dml_count 参数。

gcluster_kafka_user_allowed_max_latency

允许消息在 GBase 8a MPP Cluster 集群层缓存多长时间,超时之后必须马上提交,单位是毫秒。

此参数与gcluster_kafka_batch_commit_dml_count 作用类似,都是决定什么时候提交的。多攒一些数据再提交,有利于降低磁盘占用,如果用户对数据延迟不太敏感,而对磁盘占用比较敏感,可以通过这个参数来调节。典型值一般可以设置为 50000~20000,需要注意提交动作本身也需要消耗时间。

gcluster_kafka_local_queue_size

储存 dml 操作的队列的长度,建议至少为 gcluster_kafka_batch_commit_dml_count 的二倍多一些。

gcluster_kafka_consume_batch

consumer 一次读取 kafka 消息的条数。
如果 kafka 队列里的消息 size 较小,可以设大,反之设小,此参数对性能的影响不大,所以一般没必要设太大,建议设为 10~1000。

gcluster_kafka_ignore_pos_field

控制单个 consumer 是否比对 POS(防止
重复消费)。客户多线程往 kafka 中写入数据,写入 kafka 的数据不能确保 POS有序,原 consumer 消费数据时会做 POS 检查导致无序的数据入库时会有遗漏。现在参数 gcluster_kafka_ignore_pos_field,控制 consumer 是否进行 POS 检查。

  • POS 检查开启,consumer 消费时会丢弃已消费序号之前的消息;
  • POS 检查关闭,consumer 会将 kafka 的每条消息均入库,所以需要生产端确保发送到 kafka的消息无重复。
  • 默认值为 0,即检查重复消息;
  • 值为 1 时,不检查重复消息。

用于 Consumer 消费 only insert 消息,客户能保证 kafka 消息无重复的特殊场景。配置方法可以手动修改 gclusterdb.kafka_consumers。如:

Update gclusterdb.kafka_consumers set common_options='gcluster_kafka_ignore_pos_field=1' where name='consumer_1';

最后重启consumer_1。

_t_kafka_varchar_auto_truncate

在consumer消费kafka信息时,遇到长度超数据库定义长度的字段(仅限 varchar 类型),开启可以自动进行截位并正常消费入库模式。缺省值为 0;设置值为 1 时,表示让 consumer 对 json消息中的 after 内容进行长度判断,如果长度超过了目标表的列宽,则自动按列宽(字符长度)截断,只对 varchar 列做处理。

gcluster_kafka_message_format_type

设定 consumer 在解析 kafka 消息时,以什么格式来解析。
取值范围:JSON、PUREDATA、AUTO_DETECT
说明:

  • puredata 对应 rtsync 生产的 protobuf 消息;
  • AUTO_DETECT(默认)是让 consumer 自己侦测消息格式,这时候 consumer 会先尝试用 puredata 格式进行解析,通过就认为是 puredata 格式,否则就认为是json 格式。

注:consumer 启动后,只在解析第一条消息时做这个判断,后面直接用这个判断结果。

gcluster_kafka_ignore_pos_field

控制单个 consumer 是否比对 POS(防止重复消费)。

客户多线程往 kafka 中写入数据,写入 kafka 的数据不能确保 POS 有序,原 consumer 消费数据时会做 POS 检查导致无序的数据入库时会有遗漏。参数 gcluster_kafka_ignore_pos_field,控制 consumer 是否进行 POS 检查。

  • POS检查开启,consumer 消费时会丢弃已消费序号之前的消息;
  • POS 检查关闭,consumer 会将 kafka 的每条消息均入库,所以需要生产端确保发送到 kafka 的消息无重复。

配置方法:手动修改 gclusterdb.kafka_consumers

Update gclusterdb.kafka_consumers set common_options='gcluster_kafka_ignore_pos_field=1' where name='consumer_1';

重启 consumer_1。

gcluster_kafka_broker_version

设定 kafka server 的版本,例如 0.9.0,0.8.2… 当 kafka server 低于 0.10 版本的时候必须设置此参数,而高于 0.10版本不要设置。

gbase_kafka_principal

配置 kafka kerberos principal

gbase_kafka_keytab

配置 kafka kerberos keytab file path

gcluster_kafka_ignore_if_table_not_exist

consumer 处理一个消息时,如果消息指定的目标表不存在,是否自动忽略此消息。1 代表忽略。

gcluster_kafka_parallel_commit

consumer 向 gnode 发送 sql 是否采用并行方式。默认值是 1,代表不并行。这个参数目前不要使用,会造成主备不一致,或者 delete 不掉数据。