南大通用GBase 8a通过python UDF从客户端执行SQL语句

GBase 8a 数据库集群从862Build43开始,支持python的UDF自定义函数。本文通过python功能在命令行调用gccli正在执行SQL语句,包括某些不允许在存储过程中执行的。

python自定义函数

drop function if exists executePythonSQL;
create function executePythonSQL(sql_str varchar(8000))
returns varchar
$$
try:
    import commands
    output = commands.getoutput("gccli -ugbase -pgbase20110531 -N -vvv -e\\\"%s\\\"" %sql_str)
except:
    return None
return str(output)

$$ language plpythonu;

执行效果

gbase> select executePythonSQL('load data infile ''sftp://gbase:gbase1234@10.0.2.101/home/gbase/t.txt'' into table testdb.t1 fields terminated by '',''') a;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| a                                                                                                                                                                                                                                                                |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| --------------
load data infile 'sftp://gbase:gbase1234@10.0.2.101/home/gbase/t.txt' into table testdb.t1 fields terminated by ','
--------------

Query OK, 0 rows affected (Elapsed: 00:00:01.29)
Task 6154 finished, Loaded 0 records, Skipped 1 records

Bye |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:01.32)

gbase> select executePythonSQL('select now()');
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| executePythonSQL('select now()')                                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| --------------
select now()
--------------

+---------------------+
| 2020-11-25 10:25:17 |
+---------------------+
1 row in set (Elapsed: 00:00:00.00)

Bye |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:00.04)

gbase>

审计日志归档的应用样例

drop function if exists executePythonSQL;
create function executePythonSQL(sql_str varchar(8000))
returns varchar
$$
try:
    import commands
    output = commands.getoutput("gccli -ugbase -pgbase20110531 -N -vvv -e\\\"%s\\\"" %sql_str)
except:
    return None
return str(output)

$$ language plpythonu;

-- 
-- 此功能一定要确保每个节点,包括管理和数据,主机名不同,并全部配置了DNS,能按照主机名访问,否则加载会报错找不到服务器或文件
-- 
drop procedure if exists audit_log_archive;
delimiter //
create procedure audit_log_archive()
begin
    select   
      @@hostname as hostname, 
      thread_id, 
      taskid        , 
      start_time, 
      uid, 
      user, 
      host_ip, 
      query_time, 
      rows, 
      substr(table_list, 0, 4096), 
      substr(sql_text, 0, 8191),   -- utf8mb4 charset
      sql_type, sql_command, 
      operators, 
      status, 
      conn_type  
    from gbase.audit_log_bak1 
    into outfile '/home/gbase/audit_log.txt' -- 文件路径选择一个合适的
    fields terminated by '\xFF\xFE\xFD' -- 自定义列分割符,多个不可见字符
    lines terminated by '\xFF\xFD\xFE' -- 用户定义的行分割符,多个不可见字符
    writemode by overwrites;  -- 覆盖现有的


-- SQL语句里的最外面的单引号,用两个单引号转义
-- SQL语句里的右斜杠\, 用两个右斜杠\\代替
    select executePythonSQL(concat('load data infile ''sftp://gbase:gbase1234@',
      @@hostname,  -- 此处用主机名,一定要确保每个节点主机名不同,并配置了DNS
      '/home/gbase/audit_log.txt'' into table gclusterdb.audit_log_express fields terminated by ''\\xFF\\xFE\\xFD'' lines terminated by ''\\xFF\\xFD\\xFE'' time format ''%H:%i:%s.%f'''));

end //
delimiter ;

drop event if exists import_audit_log;
delimiter //
CREATE EVENT "import_audit_log" ON SCHEDULE EVERY 10 MINUTE ENABLE LOCAL DO 
begin 
    DECLARE insert_sign INT;
	DECLARE errno INT;
	DECLARE msg TEXT;
	DECLARE EXIT handler FOR sqlexception BEGIN
		get diagnostics condition 1 errno = gbase_errno,msg = message_text;
		CREATE TABLE IF NOT EXISTS import_audit_log_errors(err_time DATETIME, hostname VARCHAR(64), err_no INT, msg_txt VARCHAR(1024)) CHARSET = utf8;
		INSERT INTO import_audit_log_errors VALUES (now(),@@hostname,errno,substr(msg, 0, 1024));
	END;
    
    drop self table if exists gbase.audit_log_bak1;
	drop self table if exists gbase.audit_log_bak2;
    CREATE TABLE if not exists audit_log_express (hostname varchar(512) DEFAULT NULL,thread_id int(11) DEFAULT NULL,taskid bigint(20) DEFAULT NULL,start_time datetime DEFAULT NULL,end_time datetime DEFAULT NULL,user_host text,user text,host_ip text,query_time time DEFAULT NULL,rows bigint(20) DEFAULT NULL,db varchar(512) DEFAULT NULL,table_list text,sql_text text,sql_type text,sql_command text,algorithms text,status text,conn_type text) CHARSET = utf8;

    SET SELF sql_mode = '';
	CREATE SELF TABLE gbase.audit_log_bak2 LIKE gbase.audit_log;
	SET SELF sql_mode = DEFAULT;
	rename SELF TABLE gbase.audit_log TO gbase.audit_log_bak1,gbase.audit_log_bak2 TO gbase.audit_log;

    call audit_log_archive();
    
    DROP SELF TABLE gbase.audit_log_bak1;
end//
delimiter ;