GBase 8a数据库将审计日志文件加载到数据库的一个方法

本文介绍一种,从文本格式的审计日志,通过替换自定义字段和行分隔符,加载到GBase 8a数据库的方法。

参考

GBase 8a 集群审计日志audit_log攻略和使用方法

1、审计日志输出格式

默认log_output=’FILE’

2、日志文件

gcluster/log/gcluster/gclusterd-audit.log

3、日志样例

# Threadid=5;
# Taskid=80;
# Time: 211025 13:28:58
# End_time: 211025 13:29:00
# User@Host: root[root] @ localhost []
# UID: 1
# Query_time: 2.002046 Rows: 1
# Tables: WRITE: ; READ: ; OTHER: ; ;
# SET timestamp=1635139738;
# Sql_text: select sleep(2);
# Sql_type: DQL;
# Sql_command: SELECT;
# Status: SUCCESS;
# Connect Type: CAPI;

4、日志处理方案

构造自定义的字段分隔符和行分隔符。

4.1、将所有的#开头的字段部分,替换成字段分隔符。 样例是^GBASE^
4.2、将最后一个字段 # Connect Type: 最后增加行分隔符,样例是 ^GBASE_GBASE^
4.3 、sed 代码
cat gclusterd-audit.log |sed ‘s/# Threadid=([0-9]+);/\1/g’ |sed ‘s/# Taskid=([0-9]+);/^GBASE^\1/g’ |sed ‘s/# Time: (.+)/^GBASE^\1/g’ |sed ‘s/# End_time: (.+)/^GBASE^\1/g’ |sed ‘s/# User@Host: (.)/^GBASE^\1/g’ |sed ‘s/# UID: ([0-9]+)/^GBASE^\1/g’ |sed ‘s/# Query_time: (.) Rows: ([0-9]+)/^GBASE^\1\n^GBASE^\2/g’ |sed ‘s/# Tables: (.);/^GBASE^\1/g’ |sed ‘s/# SET timestamp=([0-9]);//g’ |sed ‘s/# # administrator command: ([0-9]);//g’ |sed ‘s/# Sql_text:(.)/^GBASE^\1/g’ |sed ‘s/# Sql_type:(.);/^GBASE^\1/g’ |sed ‘s/# Sql_command: (.)/^GBASE^\1/g’ |sed ‘s/# Status: (.)/^GBASE^\1/g’ |sed ‘s/# Connect Type: (.);/^GBASE^\1^GBASE_GBASE^/g’ > al.log

5、加载方案

5.1 表

字段名可根据情况,做修改,注意其中的Query_time ,是decimal小数类型

表名字可以随意修改,后面加载的表名注意对应上就行。

create table al(
Threadid bigint,
Taskid bigint,
Time datetime,
End_time datetime,
User_Host varchar(1000),
UID bigint,
Query_time decimal(16,6),
Rows bigint,
Tables_list varchar(8000),
Sql_text varchar(8000),
Sql_command varchar(100),
Sql_type varchar(100),
Status varchar(100),
conn_type varchar(100)
);

5.2 加载SQL

load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s';s
Query OK, 7 rows affected (Elapsed: 00:00:01.72)

完整脚本

注意里面infile数据源的部分,IP,操作系统用户名和密码,路径等,需要根据实际情况修改脚本。

[gbase@rh6-1 audit_log]$ cat auditlog_load.sh
logfile=$1
username=$2
password=$3
dbname=$4
tablename=$5

cat $logfile |sed 's/# Threadid=\([0-9]\+\);/\1/g' |sed 's/# Taskid=\([0-9]\+\);/^GBASE^\1/g' |sed 's/# Time: \(.\+\)/^GBASE^\1/g' |sed 's/# End_time: \(.\+\)/^GBASE^\1/g' |sed 's/# User@Host: \(.*\)/^GBASE^\1/g' |sed 's/# UID: \([0-9]\+\)/^GBASE^\1/g' |sed 's/# Query_time: \(.*\) Rows: \([0-9]\+\)/^GBASE^\1\n^GBASE^\2/g' |sed 's/# Tables: \(.*\);/^GBASE^\1/g' |sed 's/# SET timestamp=\([0-9]*\);//g' |sed 's/# # administrator command: \([0-9]*\);//g' |sed 's/# Sql_text:\(.*\)/^GBASE^\1/g' |sed 's/# Sql_type:\(.*\);/^GBASE^\1/g' |sed 's/# Sql_command: \(.*\)/^GBASE^\1/g' |sed 's/# Status: \(.*\)/^GBASE^\1/g' |sed 's/# Connect Type: \(.*\);/^GBASE^\1^GBASE_GBASE^/g'  > tmp_auditlog_load.log

gccli -vvv  -u${username} -p${password} -e"load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/tmp_auditlog_load.log' into table ${dbname}.${tablename} fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s'"

运行样例

[gbase@rh6-1 audit_log]$ sh auditlog_load.sh gclusterd-audit.log gbase gbase20110531 al al
--------------
load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/tmp_auditlog_load.log' into table al.al fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s'
--------------

Query OK, 7 rows affected (Elapsed: 00:00:01.73)
Task 73 finished, Loaded 7 records, Skipped 28 records

Bye

6、注意事项和限制

所有从客户端发来的SQL,都能被加载,而数据库内部互相发的SQL, 从原始数据看,没有TIME字段,只有END_TIME
比如

# Threadid=52;
# Taskid=0;
# End_time: 211025 13:00:01
# User@Host: root[root] @  [10.0.2.201]
# UID: 1
# Query_time: 0.000228 Rows: 0
# SET timestamp=1635138001;
# administrator command: Quit;
# Sql_type: OTHERS;
# Sql_command: Quit;
# Status: SUCCESS;
# Connect Type: CAPI;

如果想这类内部SQL也加载,只能

1、牺牲掉其它SQL的TIME字段,sed时替换成空
2、分2次加载,第一次带TIME字段,其它的为错误数据,第二次不带TIME字段。
3、其它的没加载成功的,就算了吧

gbase> load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '$GBASE$' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '$GBASE_GBASE$' DATETIME FORMAT '%y%m%d %H:%i:%s';
Query OK, 7 rows affected (Elapsed: 00:00:01.62)
Task 74 finished, Loaded 7 records, Skipped 28 records

gbase> load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '$GBASE$' TABLE_FIELDS 'Threadid,Taskid,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '$GBASE_GBASE$' DATETIME FORMAT '%y%m%d %H:%i:%s';
Query OK, 18 rows affected (Elapsed: 00:00:01.39)
Task 75 finished, Loaded 18 records, Skipped 17 records