MySQL在ROW模式下通过binlog提取SQL语句

Linux
基于row模式的binlog,生成DML(insert/update/delete)的rollback语句
通过MySQLbinlog -v 解析binlog生成可读的sql文件
提取需要处理的有效sql
  "### "开头的行.如果输入的start-position位于某个event group中间,则会导致"无法识别event"错误


将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行
  INSERT: INSERT INTO => DELETE FROM, SET => WHERE
  UPDATE: WHERE => SET, SET => WHERE
  DELETE: DELETE FROM => INSERT INTO, WHERE => SET
用列名替换位置@{1,2,3}
  通过desc table获得列顺序及对应的列名
  特殊列类型value做特别处理
逆序

注意:
  表结构与现在的表结构必须相同[谨记]
  由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT
  只能对INSERT/UPDATE/DELETE进行处理

mysql> select * from yoon;
+----------+------------+-----------+---------------------+
| actor_id | first_name | last_name | last_update         |
+----------+------------+-----------+---------------------+
|        1 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        2 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        3 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        4 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        5 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        6 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        7 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        8 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        9 | HANK       | YOON      | 2006-02-15 04:34:33 |
|       10 | HANK       | YOON      | 2006-02-15 04:34:33 |
|       11 | HANK       | YOON      | 2006-02-15 04:34:33 |
+----------+------------+-----------+---------------------+
11 rows in set (0.00 sec)


mysql> delete from yoon;
Query OK, 11 rows affected (1.03 sec)


mysql> select * from yoon;
Empty set (0.00 sec)

命令之间的空格一定要注意,否则就会无法提取SQL语句:
[root@hank-yoon data]# perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/export/data/mysql/data/yoon.sql' -u 'root' -p 'yoon'
Warning: Using a password on the command line interface can be insecure.
[root@hank-yoon data]# ls
auto.cnf            hank     ibdata2      ib_logfile1  modify.pl  mysql-bin.000001  performance_schema  test  yoon.sql
binlog-rollback.pl  ibdata1  ib_logfile0  ib_logfile2  mysql      mysql-bin.index   sakila              yoon
[root@hank-yoon data]# cat yoon.sql 
INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=10, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=9, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=8, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=7, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=6, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=5, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=4, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=3, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=2, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=1, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);

mysql> INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`='HANK', `last_name`='YOON', `last_update`=from_unixtime(1139949273);
Query OK, 1 row affected (0.01 sec)


mysql> select * from yoon;
+----------+------------+-----------+---------------------+
| actor_id | first_name | last_name | last_update         |
+----------+------------+-----------+---------------------+
|       11 | HANK       | YOON      | 2006-02-15 04:34:33 |
+----------+------------+-----------+---------------------+

#!/usr/lib/perl -w

use strict;

use warnings;

use Class::Struct;

use Getopt::Long qw(:config no_ignore_case);                    # GetOption

# register handler system signals

use sigtrap 'handler', \&sig_int, 'normal-signals';

# catch signal

sub sig_int(){

    my ($signals) = @_;

    print STDERR "# Caught SIG$signals.\n";

    exit 1;

}

my %opt;

my $srcfile;

my $host = '127.0.0.1';

my $port = 3306;

my ($user,$pwd);

my ($MYSQL, $MYSQLBINLOG, $ROLLBACK_DML);

my $outfile = '/dev/null';

my (%do_dbs,%do_tbs);

# tbname=>tbcol, tbcol: @n=>colname,type

my %tbcol_pos;

my $SPLITER_COL = ',';

my $SQLTYPE_IST = 'INSERT';

my $SQLTYPE_UPD = 'UPDATE';

my $SQLTYPE_DEL = 'DELETE';

my $SQLAREA_WHERE = 'WHERE';

my $SQLAREA_SET = 'SET';

my $PRE_FUNCT = '========================== ';

# =========================================================

# 基于row模式的binlog,生成DML(insert/update/delete)的rollback语句

# 通过mysqlbinlog -v 解析binlog生成可读的sql文件

# 提取需要处理的有效sql

#     "### "开头的行.如果输入的start-position位于某个event group中间,则会导致"无法识别event"错误

#

# 将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行

#     INSERT: INSERT INTO => DELETE FROM, SET => WHERE

#     UPDATE: WHERE => SET, SET => WHERE

#     DELETE: DELETE FROM => INSERT INTO, WHERE => SET

# 用列名替换位置@{1,2,3}

#     通过desc table获得列顺序及对应的列名

#     特殊列类型value做特别处理

# 逆序

# 注意:

#     表结构与现在的表结构必须相同[谨记]

#     由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT

#     只能对INSERT/UPDATE/DELETE进行处理

# ========================================================

sub main{

    # get input option

    &get_options();

    # 

    &init_tbcol();

    #

    &do_binlog_rollback();

}

&main();

# ----------------------------------------------------------------------------------------

# Func : get options and set option flag 

# ----------------------------------------------------------------------------------------

sub get_options{

    #Get options info

    GetOptions(\%opt,

        'help',                    # OUT : print help info 

        'f|srcfile=s',            # IN : binlog file

        'o|outfile=s',            # out : output sql file

        'h|host=s',                # IN : host

        'u|user=s', # IN : user

        'p|password=s', # IN : password

        'P|port=i',                # IN : port

        'start-datetime=s',        # IN : start datetime

        'stop-datetime=s',        # IN : stop datetime

        'start-position=i',        # IN : start position

        'stop-position=i',        # IN : stop position

        'd|database=s',            # IN : database, split comma

        'T|table=s',            # IN : table, split comma

        'i|ignore',                # IN : ignore binlog check ddl and so on

        'debug',                # IN : print debug information

     ) or print_usage();

    if (!scalar(%opt)) {

        &print_usage();

    }

    # Handle for options

    if ($opt{'f'}){

        $srcfile = $opt{'f'};

    }else{

        &merror("please input binlog file");

    }

    $opt{'h'} and $host = $opt{'h'};

    $opt{'u'} and $user = $opt{'u'};

    $opt{'p'} and $pwd = $opt{'p'};

    $opt{'P'} and $port = $opt{'P'};

    if ($opt{'o'}) {

        $outfile = $opt{'o'};

        # 清空 outfile

        `echo '' > $outfile`;

    }

    # 

    $MYSQL = qq{mysql -h$host -u$user -p'$pwd' -P$port};

    &mdebug("get_options::MYSQL\n\t$MYSQL");

    # 提取binlog,不需要显示列定义信息,用-v,而不用-vv

    $MYSQLBINLOG = qq{mysqlbinlog -v};

    $MYSQLBINLOG .= " --start-position=".$opt{'start-position'} if $opt{'start-position'};

    $MYSQLBINLOG .= " --stop-position=".$opt{'stop-position'} if $opt{'stop-postion'};

    $MYSQLBINLOG .= " --start-datetime='".$opt{'start-datetime'}."'" if $opt{'start-datetime'};

    $MYSQLBINLOG .= " --stop-datetime='$opt{'stop-datetime'}'" if $opt{'stop-datetime'};

    $MYSQLBINLOG .= " $srcfile";

    &mdebug("get_options::MYSQLBINLOG\n\t$MYSQLBINLOG");

    # 检查binlog中是否含有 ddl sql: CREATE|ALTER|DROP|RENAME

    &check_binlog() unless ($opt{'i'});

    # 不使用mysqlbinlog过滤,USE dbname;方式可能会漏掉某些sql,所以不在mysqlbinlog过滤

    # 指定数据库

    if ($opt{'d'}){

        my @dbs = split(/,/,$opt{'d'});

        foreach my $db (@dbs){

            $do_dbs{$db}=1;

        }

    }

    # 指定表

    if ($opt{'T'}){

        my @tbs = split(/,/,$opt{'T'});

        foreach my $tb (@tbs){

            $do_tbs{$tb}=1;

        }

    }

    # 提取有效DML SQL

    $ROLLBACK_DML = $MYSQLBINLOG." | grep '^### '";

    # 去掉注释: '### ' -> ''

    # 删除首尾空格

    $ROLLBACK_DML .= " | sed 's/###\\s*//g;s/\\s*\$//g'";

    &mdebug("rollback dml\n\t$ROLLBACK_DML");

   

    # 检查内容是否为空

    my $cmd = "$ROLLBACK_DML | wc -l";

    &mdebug("check contain dml sql\n\t$cmd");

    my $size = `$cmd`;

    chomp($size);

    unless ($size >0){

        &merror("binlog DML is empty:$ROLLBACK_DML");

    };

}   

# ----------------------------------------------------------------------------------------

# Func : check binlog contain DDL

# ----------------------------------------------------------------------------------------

sub check_binlog{

    &mdebug("$PRE_FUNCT check_binlog");

    my $cmd = "$MYSQLBINLOG ";

    $cmd .= " | grep -E -i '^(CREATE|ALTER|DROP|RENAME)' ";

    &mdebug("check binlog has DDL cmd\n\t$cmd");

    my $ddlcnt = `$cmd`;

    chomp($ddlcnt);

    my $ddlnum = `$cmd | wc -l`;

    chomp($ddlnum);

    my $res = 0;

    if ($ddlnum>0){

        # 在ddl sql前面加上前缀<DDL>

        $ddlcnt = `echo '$ddlcnt' | sed 's/^//g'`;

        &merror("binlog contain $ddlnum DDL:$MYSQLBINLOG. ddl sql:\n$ddlcnt");

    }

    return $res;

}

# ----------------------------------------------------------------------------------------

# Func : init all table column order

#        if input --database --table params, only get set table column order

# ----------------------------------------------------------------------------------------

sub init_tbcol{

    &mdebug("$PRE_FUNCT init_tbcol");

    # 提取DML语句

    my $cmd .= "$ROLLBACK_DML | grep -E '^(INSERT|UPDATE|DELETE)'";

    # 提取表名,并去重

    #$cmd .= " | awk '{if (\$1 ~ \"^UPDATE\") {print \$2}else {print \$3}}' | uniq ";

    $cmd .= " | awk '{if (\$1 ~ \"^UPDATE\") {print \$2}else {print \$3}}' | sort | uniq ";

    &mdebug("get table name cmd\n\t$cmd");

    open ALLTABLE, "$cmd | " or die "can't open file:$cmd\n";

    while (my $tbname = <ALLTABLE>){

        chomp($tbname);

        #if (exists $tbcol_pos{$tbname}){

        #    next;

        #}

        &init_one_tbcol($tbname) unless (&ignore_tb($tbname));

       

    }

    close ALLTABLE or die "can't close file:$cmd\n";

    # init tb col

    foreach my $tb (keys %tbcol_pos){

        &mdebug("tbname->$tb");

        my %colpos = %{$tbcol_pos{$tb}};

        foreach my $pos (keys %colpos){

            my $col = $colpos{$pos};

            my ($cname,$ctype) = split(/$SPLITER_COL/, $col);

            &mdebug("\tpos->$pos,cname->$cname,ctype->$ctype");

        }

    }

};

# ----------------------------------------------------------------------------------------

# Func : init one table column order

# ----------------------------------------------------------------------------------------

sub init_one_tbcol{

    my $tbname = shift;

    &mdebug("$PRE_FUNCT init_one_tbcol");

    # 获取表结构及列顺序

    my $cmd = $MYSQL." --skip-column-names --silent -e 'desc $tbname'";

    # 提取列名,并拼接

    $cmd .= " | awk -F\'\\t\' \'{print NR\"$SPLITER_COL`\"\$1\"`$SPLITER_COL\"\$2}'";

    &mdebug("get table column infor cmd\n\t$cmd");

    open TBCOL,"$cmd | " or die "can't open desc $tbname;";

    my %colpos;

    while (my $line = <TBCOL>){

        chomp($line);

        my ($pos,$col,$coltype) = split(/$SPLITER_COL/,$line);

        &mdebug("linesss=$line\n\t\tpos=$pos\n\t\tcol=$col\n\t\ttype=$coltype");

        $colpos{$pos} = $col.$SPLITER_COL.$coltype;

    }

    close TBCOL or die "can't colse desc $tbname";

    $tbcol_pos{$tbname} = \%colpos;

}

# ----------------------------------------------------------------------------------------

# Func : rollback sql:    INSERT/UPDATE/DELETE

# ----------------------------------------------------------------------------------------

sub do_binlog_rollback{

    my $binlogfile = "$ROLLBACK_DML ";

    &mdebug("$PRE_FUNCT do_binlog_rollback");

    # INSERT|UPDATE|DELETE

    my $sqltype;

    # WHERE|SET

    my $sqlarea;

   

    my ($tbname, $sqlstr) = ('', '');

    my ($notignore, $isareabegin) = (0,0);

    # output sql file

    open SQLFILE, ">> $outfile" or die "Can't open sql file:$outfile";

    # binlog file

    open BINLOG, "$binlogfile |" or die "Can't open file: $binlogfile";

    while (my $line = <BINLOG>){

        chomp($line);

        if ($line =~ /^(INSERT|UPDATE|DELETE)/){

            # export sql

            if ($sqlstr ne ''){

                $sqlstr .= ";\n";

                print SQLFILE $sqlstr;

                &mdebug("export sql\n\t".$sqlstr);

                $sqlstr = '';

            }

            if ($line =~ /^INSERT/){

                $sqltype = $SQLTYPE_IST;

                $tbname = `echo '$line' | awk '{print \$3}'`;

                chomp($tbname);

                $sqlstr = qq{DELETE FROM $tbname};

            }elsif ($line =~ /^UPDATE/){

                $sqltype = $SQLTYPE_UPD;

                $tbname = `echo '$line' | awk '{print \$2}'`;

                chomp($tbname);

                $sqlstr = qq{UPDATE $tbname};

            }elsif ($line =~ /^DELETE/){

                $sqltype = $SQLTYPE_DEL;   

                $tbname = `echo '$line' | awk '{print \$3}'`;

                chomp($tbname);

                $sqlstr = qq{INSERT INTO $tbname};

            }

            # check ignore table

            if(&ignore_tb($tbname)){

                $notignore = 0;

                &mdebug("#IGNORE#:line:".$line);

                $sqlstr = '';

            }else{

                $notignore = 1;

                &mdebug("#DO#:line:".$line);

            }

        }else {

            if($notignore){

                &merror("can't get tbname") unless (defined($tbname));

                if ($line =~ /^WHERE/){

                    $sqlarea = $SQLAREA_WHERE;

                    $sqlstr .= qq{ SET};

                    $isareabegin = 1;

                }elsif ($line =~ /^SET/){

                    $sqlarea = $SQLAREA_SET;

                    $sqlstr .= qq{ WHERE};

                    $isareabegin = 1;

                }elsif ($line =~ /^\@/){

                    $sqlstr .= &deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);

                    $isareabegin = 0;

                }else{

                    &mdebug("::unknown sql:".$line);

                }

            }

        }

    }

    # export last sql

    if ($sqlstr ne ''){

        $sqlstr .= ";\n";

        print SQLFILE $sqlstr;

        &mdebug("export sql\n\t".$sqlstr);

    }

   

    close BINLOG or die "Can't close binlog file: $binlogfile";

    close SQLFILE or die "Can't close out sql file: $outfile";

    # 逆序

    # 1!G: 只有第一行不执行G, 将hold space中的内容append回到pattern space

    # h: 将pattern space 拷贝到hold space

    # $!d: 除最后一行都删除

    my $invert = "sed -i '1!G;h;\$!d' $outfile";

    my $res = `$invert`;

    &mdebug("inverter order sqlfile :$invert");

}

# ----------------------------------------------------------------------------------------

# Func : transfer column pos to name

#    deal column value

#

# &deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);

# ----------------------------------------------------------------------------------------

sub deal_col_value($$$$$){

    my ($tbname, $sqltype, $sqlarea, $isareabegin, $line) = @_;

    &mdebug("$PRE_FUNCT deal_col_value");

    &mdebug("input:tbname->$tbname,type->$sqltype,area->$sqlarea,areabegin->$isareabegin,line->$line");

    my @vals = split(/=/, $line);

    my $pos = substr($vals[0],1);

    my $valstartpos = length($pos)+2;

    my $val = substr($line,$valstartpos);

    my %tbcol = %{$tbcol_pos{$tbname}};

    my ($cname,$ctype) = split(/$SPLITER_COL/,$tbcol{$pos});

    &merror("can't get $tbname column $cname type") unless (defined($cname) || defined($ctype));

    &mdebug("column infor:cname->$cname,type->$ctype");

    # join str

    my $joinstr;

    if ($isareabegin){

        $joinstr = ' ';

    }else{

        # WHERE 被替换为 SET, 使用 , 连接

        if ($sqlarea eq $SQLAREA_WHERE){

            $joinstr = ', ';

        # SET 被替换为 WHERE 使用 AND 连接

        }elsif ($sqlarea eq $SQLAREA_SET){

            $joinstr = ' AND ';

        }else{

            &merror("!!!!!!The scripts error");

        }

    }

   

    # 

    my $newline = $joinstr;

    # NULL value

    if (($val eq 'NULL') && ($sqlarea eq $SQLAREA_SET)){

        $newline .= qq{ $cname IS NULL};

    }else{

        # timestamp: record seconds

        if ($ctype eq 'timestamp'){

            $newline .= qq{$cname=from_unixtime($val)};

        # datetime: @n=yyyy-mm-dd hh::ii::ss

        }elsif ($ctype eq 'datetime'){

            $newline .= qq{$cname='$val'};

        }else{

            $newline .= qq{$cname=$val};

        }

    }

    &mdebug("\told>$line\n\tnew>$newline");

   

    return $newline;

}

# ----------------------------------------------------------------------------------------

# Func : check is ignore table

# params: IN table full name # format:`dbname`.`tbname`

# RETURN:

#        0 not ignore

#        1 ignore

# ----------------------------------------------------------------------------------------

sub ignore_tb($){

    my $fullname = shift;

    # 删除`

    $fullname =~ s/`//g;

    my ($dbname,$tbname) = split(/\./,$fullname);

    my $res = 0;

   

    # 指定了数据库

    if ($opt{'d'}){

        # 与指定库相同

        if ($do_dbs{$dbname}){

            # 指定表

            if ($opt{'T'}){

                # 与指定表不同

                unless ($do_tbs{$tbname}){

                    $res = 1;

                }

            }

        # 与指定库不同

        }else{

            $res = 1;

        }

    }

    #&mdebug("Table check ignore:$fullname->$res");

    return $res;

}

# ----------------------------------------------------------------------------------------

# Func : print debug msg

# ----------------------------------------------------------------------------------------

sub mdebug{

    my (@msg) = @_;

    print "@msg\n" if ($opt{'debug'});

}

# ----------------------------------------------------------------------------------------

# Func : print error msg and exit

# ----------------------------------------------------------------------------------------

sub merror{

    my (@msg) = @_;

    print ":@msg\n";

    &print_usage();

    exit(1);

}

# ----------------------------------------------------------------------------------------

# Func : print usage

# ----------------------------------------------------------------------------------------

sub print_usage{

    print <<EOF;

==========================================================================================

Command line options :

    --help                # OUT : print help info 

    -f, --srcfile            # IN : binlog file. [required]

    -o, --outfile            # OUT : output sql file. [required]

    -h, --host            # IN : host. default '127.0.0.1'

    -u, --user            # IN : user. [required]

    -p, --password            # IN : password. [required] 

    -P, --port            # IN : port. default '3306'

    --start-datetime        # IN : start datetime

    --stop-datetime            # IN : stop datetime

    --start-position        # IN : start position

    --stop-position            # IN : stop position

    -d, --database            # IN : database, split comma

    -T, --table            # IN : table, split comma. [required] set -d

    -i, --ignore            # IN : ignore binlog check contain DDL(CREATE|ALTER|DROP|RENAME)

    --debug                # IN : print debug information

Sample :

   shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' 

   shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' -i

   shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' --debug

   shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -h '192.168.1.2' -u 'user' -p 'pwd' -P 3307

   shell> perl binlog-rollback.pl -f 'mysql-bin.000001' -o '/tmp/t.sql' -u 'user' -p 'pwd' --start-position=107

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/ed581986558c5349841d5ad10357f517.html