From d866344f1d7ecc4093c6180a0ea55be20d684d7c Mon Sep 17 00:00:00 2001 From: danfengcao Date: Sun, 15 Jan 2017 20:56:57 +0800 Subject: [PATCH 01/22] fix requirements name --- example/mysql-flashback-priciple-and-practice.md | 2 +- requirments.txt => requirements.txt | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename requirments.txt => requirements.txt (100%) diff --git a/example/mysql-flashback-priciple-and-practice.md b/example/mysql-flashback-priciple-and-practice.md index 65fb978..58ba2dd 100644 --- a/example/mysql-flashback-priciple-and-practice.md +++ b/example/mysql-flashback-priciple-and-practice.md @@ -188,7 +188,7 @@ INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2014-11-11 00:04:48' * 闪回的关键是快速筛选出真正需要回滚的SQL。 * 先根据库、表、时间做一次过滤,再根据位置做更准确的过滤。 * 由于数据一直在写入,要确保回滚sql中不包含其他数据。可根据是否是同一事务、误操作行数、字段值的特征等等来帮助判断。 -* 执行回滚sql时如有报错,需要查实具体原因,一般是因为对应的数据已发生变化。由于是严格的行模式,只要有主键或唯一键存在,就只会报某条数据不存在的错,不必担心会更新不该操作的数据。 +* 执行回滚sql时如有报错,需要查实具体原因,一般是因为对应的数据已发生变化。由于是严格的行模式,只要有唯一键(包括主键)存在,就只会报某条数据不存在的错,不必担心会更新不该操作的数据。 * 如果待回滚的表与其他表有关联,要与开发说明回滚和不回滚各自的副作用,再确定方案。 * 回滚后数据变化,可能对用户和线上应用造成困惑(类似幻读)。 diff --git a/requirments.txt b/requirements.txt similarity index 100% rename from requirments.txt rename to requirements.txt From 41ff90e13e2c25b0c2c79512c61e40f01759c2d7 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Thu, 9 Feb 2017 15:01:13 +0800 Subject: [PATCH 02/22] fix: parse help --- binlog2sql/binlog2sql_util.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 9338e62..c9149a2 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -72,10 +72,11 @@ def parse_args(args): help='Generate insert sql without primary key if exists', default=False) parser.add_argument('-B', '--flashback', dest='flashback', action='store_true', help='Flashback data to start_postition of start_file', default=False) - return parser.parse_args(args) + return parser def command_line_args(args): - args = parse_args(args) + parser = parse_args(args) + args = parser.parse_args(args) if args.help: parser.print_help() sys.exit(1) From e5bb1203fadf3dda45806546692a9f3f7f729d84 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Mon, 27 Feb 2017 16:45:43 +0800 Subject: [PATCH 03/22] modify README --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 40e0bec..7230c3b 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ binlog2sql shell> git clone https://github.com/danfengcao/binlog2sql.git && cd binlog2sql shell> pip install -r requirements.txt ``` +git与pip的安装问题请自行搜索解决。 使用 ========= @@ -36,8 +37,9 @@ shell> pip install -r requirements.txt [mysqld] server_id = 1 log_bin = /var/log/mysql/mysql-bin.log - max_binlog_size = 100M + max_binlog_size = 1G binlog_format = row + binlog_row_image = full ### user需要的最小权限集合: @@ -191,6 +193,7 @@ INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', ###限制 * mysql server必须开启,离线模式下不能解析 +* 参数 _binlog\_row\_image_ 必须为FULL,暂不支持MINIMAL ###优点(对比mysqlbinlog) From f5b213c8efdf4507084ff405611855ac10078714 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Fri, 10 Mar 2017 18:03:41 +0800 Subject: [PATCH 04/22] print help if args is NULL --- binlog2sql/binlog2sql_util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index c9149a2..c3c5f95 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -75,9 +75,10 @@ def parse_args(args): return parser def command_line_args(args): + needPrintHelp = False if args else True parser = parse_args(args) args = parser.parse_args(args) - if args.help: + if args.help or needPrintHelp: parser.print_help() sys.exit(1) if not args.startFile: From 8d6456f70d7c1e34d957e378b00fb7d7cf77427b Mon Sep 17 00:00:00 2001 From: danfengcao Date: Fri, 10 Mar 2017 18:07:39 +0800 Subject: [PATCH 05/22] sleep 1s per 1000 rows --- README.md | 2 +- binlog2sql/binlog2sql.py | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7230c3b..d67c257 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` -K, --no-primary-key 对INSERT语句去除主键。可选。 --B, --flashback 生成回滚语句,可解析大文件,不受内存限制。可选。与stop-never或no-primary-key不能同时添加。 +-B, --flashback 生成回滚语句,可解析大文件,不受内存限制,每打印一千行加一句SLEEP SELECT(1)。可选。与stop-never或no-primary-key不能同时添加。 **解析范围控制** diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index a343433..53d029d 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -100,16 +100,28 @@ def process_binlog(self): if flagLastEvent: break ftmp.close() + if self.flashback: - with open(tmpFile) as ftmp: - for line in reversed_lines(ftmp): - print line.rstrip() + self.print_rollback_sql(tmpFile) finally: os.remove(tmpFile) cur.close() stream.close() return True + def print_rollback_sql(self, fin): + '''print rollback sql from tmpfile''' + with open(fin) as ftmp: + sleepInterval = 1000 + i = 0 + for line in reversed_lines(ftmp): + print line.rstrip() + if i >= sleepInterval: + print 'SELECT SLEEP(1);' + i = 0 + else: + i += 1 + def __del__(self): pass From 97a5802b01c13b1f6da70535287037dfe9e5d101 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Fri, 7 Apr 2017 16:47:44 +0800 Subject: [PATCH 06/22] fix README --- README.md | 86 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index d67c257..139c1cf 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,9 @@ git与pip的安装问题请自行搜索解决。 * super/replication client:两个权限都可以,需要执行'SHOW MASTER STATUS', 获取server端的binlog列表 * replication slave:通过BINLOG_DUMP协议获取binlog内容的权限 -###基本用法 + +### 基本用法 + **解析出标准SQL** @@ -77,7 +79,9 @@ shell> python binlog2sql.py --flashback -h127.0.0.1 -P3306 -uadmin -p'admin' -dt INSERT INTO `test`.`test3`(`addtime`, `data`, `id`) VALUES ('2016-12-10 13:03:38', 'english', 4); #start 981 end 1147 UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id`=3 WHERE `addtime`='2016-12-10 12:00:00' AND `data`='中文' AND `id`=3 LIMIT 1; #start 763 end 954 ``` -###选项 + +### 选项 + **mysql连接配置** -h host; -P port; -u user; -p password @@ -110,7 +114,7 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` -t, --tables 只输出目标tables的sql。可选。默认为空。 -###应用案例 +### 应用案例 #### **误删整张表数据,需要紧急回滚** @@ -142,38 +146,38 @@ Empty set (0.00 sec) 1. 登录mysql,查看目前的binlog文件 ```bash -mysql> show master status; -+------------------+-----------+ -| Log_name | File_size | -+------------------+-----------+ -| mysql-bin.000051 | 967 | -| mysql-bin.000052 | 965 | -+------------------+-----------+ -``` + mysql> show master status; + +------------------+-----------+ + | Log_name | File_size | + +------------------+-----------+ + | mysql-bin.000051 | 967 | + | mysql-bin.000052 | 965 | + +------------------+-----------+ + ``` 2. 最新的binlog文件是mysql-bin.000052,我们再定位误操作SQL的binlog位置。误操作人只能知道大致的误操作时间,我们根据大致时间过滤数据。 ```bash -shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -ttbl --start-file='mysql-bin.000052' --start-datetime='2016-12-13 20:25:00' --stop-datetime='2016-12-13 20:30:00' -输出: -INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-13 20:26:00', 4, '小李'); #start 317 end 487 time 2016-12-13 20:26:26 -UPDATE `test`.`tbl` SET `addtime`='2016-12-12 00:00:00', `id`=4, `name`='小李' WHERE `addtime`='2016-12-13 20:26:00' AND `id`=4 AND `name`='小李' LIMIT 1; #start 514 end 701 time 2016-12-13 20:27:07 -DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-10 00:04:33' AND `id`=1 AND `name`='小赵' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 -DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-10 00:04:48' AND `id`=2 AND `name`='小钱' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 -DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-13 20:25:00' AND `id`=3 AND `name`='小孙' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 -DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-12 00:00:00' AND `id`=4 AND `name`='小李' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 -``` + shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -ttbl --start-file='mysql-bin.000052' --start-datetime='2016-12-13 20:25:00' --stop-datetime='2016-12-13 20:30:00' + 输出: + INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-13 20:26:00', 4, '小李'); #start 317 end 487 time 2016-12-13 20:26:26 + UPDATE `test`.`tbl` SET `addtime`='2016-12-12 00:00:00', `id`=4, `name`='小李' WHERE `addtime`='2016-12-13 20:26:00' AND `id`=4 AND `name`='小李' LIMIT 1; #start 514 end 701 time 2016-12-13 20:27:07 + DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-10 00:04:33' AND `id`=1 AND `name`='小赵' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 + DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-10 00:04:48' AND `id`=2 AND `name`='小钱' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 + DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-13 20:25:00' AND `id`=3 AND `name`='小孙' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 + DELETE FROM `test`.`tbl` WHERE `addtime`='2016-12-12 00:00:00' AND `id`=4 AND `name`='小李' LIMIT 1; #start 728 end 938 time 2016-12-13 20:28:05 + ``` 3. 我们得到了误操作sql的准确位置在728-938之间,再根据位置进一步过滤,使用flashback模式生成回滚sql,检查回滚sql是否正确(注:真实环境下,此步经常会进一步筛选出需要的sql。结合grep、编辑器等) ```bash -shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -ttbl --start-file='mysql-bin.000052' --start-position=3346 --stop-position=3556 -B > rollback.sql | cat -输出: -INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-12 00:00:00', 4, '小李'); #start 728 end 938 time 2016-12-13 20:28:05 -INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-13 20:25:00', 3, '小孙'); #start 728 end 938 time 2016-12-13 20:28:05 -INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:48', 2, '小钱'); #start 728 end 938 time 2016-12-13 20:28:05 -INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', 1, '小赵'); #start 728 end 938 time 2016-12-13 20:28:05 -``` + shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -ttbl --start-file='mysql-bin.000052' --start-position=3346 --stop-position=3556 -B > rollback.sql | cat + 输出: + INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-12 00:00:00', 4, '小李'); #start 728 end 938 time 2016-12-13 20:28:05 + INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-13 20:25:00', 3, '小孙'); #start 728 end 938 time 2016-12-13 20:28:05 + INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:48', 2, '小钱'); #start 728 end 938 time 2016-12-13 20:28:05 + INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', 1, '小赵'); #start 728 end 938 time 2016-12-13 20:28:05 + ``` 4. 确认回滚sql正确,执行回滚语句。登录mysql确认,数据回滚成功。 @@ -181,21 +185,22 @@ INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', shell> mysql -h127.0.0.1 -P3306 -uadmin -p'admin' < rollback.sql mysql> select * from tbl; -+----+--------+---------------------+ -| id | name | addtime | -+----+--------+---------------------+ -| 1 | 小赵 | 2016-12-10 00:04:33 | -| 2 | 小钱 | 2016-12-10 00:04:48 | -| 3 | 小孙 | 2016-12-13 20:25:00 | -| 4 | 小李 | 2016-12-12 00:00:00 | -+----+--------+---------------------+ -``` + +----+--------+---------------------+ + | id | name | addtime | + +----+--------+---------------------+ + | 1 | 小赵 | 2016-12-10 00:04:33 | + | 2 | 小钱 | 2016-12-10 00:04:48 | + | 3 | 小孙 | 2016-12-13 20:25:00 | + | 4 | 小李 | 2016-12-12 00:00:00 | + +----+--------+---------------------+ + ``` + +### 限制 -###限制 * mysql server必须开启,离线模式下不能解析 * 参数 _binlog\_row\_image_ 必须为FULL,暂不支持MINIMAL -###优点(对比mysqlbinlog) +### 优点(对比mysqlbinlog) * 纯Python开发,安装与使用都很简单 * 自带flashback、no-primary-key解析模式,无需再装补丁 @@ -203,14 +208,15 @@ INSERT INTO `test`.`tbl`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', * 解析为标准SQL,方便理解、调试 * 代码容易改造,可以支持更多个性化解析 -###贡献者 +### 贡献者 * danfengcao 维护者 [https://github.com/danfengcao](https://github.com/danfengcao) * 大众点评DBA团队 想法交流,使用体验 [dba_op@dianping.com](dba_op@dianping.com) * 赵承勇 pymysqlreplication权限bug [https://github.com/imzcy1987](https://github.com/imzcy1987) * 陈路炳 bug报告(字段值为空时的处理),使用体验 [https://github.com/bingluchen](https://github.com/bingluchen) -###联系我 +### 联系我 + 有任何问题,请与我联系。微信:danfeng053005 邮箱:[danfengcao.info@gmail.com](danfengcao.info@gmail.com) 欢迎提问题提需求,欢迎pull requests! From e6883ec700a23c53d8852ec3acda1903fd7b1840 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Fri, 7 Apr 2017 17:18:45 +0800 Subject: [PATCH 07/22] fix priciple-and-practice.md --- .../mysql-flashback-priciple-and-practice.md | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/example/mysql-flashback-priciple-and-practice.md b/example/mysql-flashback-priciple-and-practice.md index 58ba2dd..48d8b3a 100644 --- a/example/mysql-flashback-priciple-and-practice.md +++ b/example/mysql-flashback-priciple-and-practice.md @@ -135,37 +135,37 @@ mysql> select count(*) from user; 1. 登录mysql,查看目前的binlog文件 ```bash -mysql> show master logs; -+------------------+-----------+ -| Log_name | File_size | -+------------------+-----------+ -| mysql-bin.000053 | 168652863 | -| mysql-bin.000054 | 504549 | -+------------------+-----------+ -``` + mysql> show master logs; + +------------------+-----------+ + | Log_name | File_size | + +------------------+-----------+ + | mysql-bin.000053 | 168652863 | + | mysql-bin.000054 | 504549 | + +------------------+-----------+ + ``` 2. 最新的binlog文件是mysql-bin.000054。我们的目标是筛选出需要回滚的SQL,由于误操作人只知道大致的误操作时间,我们首先根据时间做一次过滤。只需要解析test库user表。(注:如果有多个sql误操作,则生成的binlog可能分布在多个文件,需解析多个文件) ```bash -shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -tuser --start-file='mysql-bin.000054' --start-datetime='2016-12-26 11:44:00' --stop-datetime='2016-12-26 11:50:00' > /tmp/raw.sql + shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -tuser --start-file='mysql-bin.000054' --start-datetime='2016-12-26 11:44:00' --stop-datetime='2016-12-26 11:50:00' > /tmp/raw.sql raw.sql 输出: -DELETE FROM `test`.`user` WHERE `addtime`='2014-11-11 00:04:48' AND `id`=2 AND `name`='小钱' LIMIT 1; #start 257427 end 265754 time 2016-12-26 11:44:56 -DELETE FROM `test`.`user` WHERE `addtime`='2015-11-11 20:25:00' AND `id`=3 AND `name`='小孙' LIMIT 1; #start 257427 end 265754 time 2016-12-26 11:44:56 -... -DELETE FROM `test`.`user` WHERE `addtime`='2016-12-14 23:09:07' AND `id`=24530 AND `name`='tt' LIMIT 1; #start 257427 end 504272 time 2016-12-26 11:44:56 -INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', 32722, '小王'); #start 504299 end 504522 time 2016-12-26 11:49:42 -... -``` + DELETE FROM `test`.`user` WHERE `addtime`='2014-11-11 00:04:48' AND `id`=2 AND `name`='小钱' LIMIT 1; #start 257427 end 265754 time 2016-12-26 11:44:56 + DELETE FROM `test`.`user` WHERE `addtime`='2015-11-11 20:25:00' AND `id`=3 AND `name`='小孙' LIMIT 1; #start 257427 end 265754 time 2016-12-26 11:44:56 + ... + DELETE FROM `test`.`user` WHERE `addtime`='2016-12-14 23:09:07' AND `id`=24530 AND `name`='tt' LIMIT 1; #start 257427 end 504272 time 2016-12-26 11:44:56 + INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-10 00:04:33', 32722, '小王'); #start 504299 end 504522 time 2016-12-26 11:49:42 + ... + ``` 3. 根据位置信息,我们确定了误操作sql来自同一个事务,准确位置在257427-504272之间(binlog2sql对于同一个事务会输出同样的start position)。再根据位置过滤,使用 _**-B**_ 选项生成回滚sql,检查回滚sql是否正确。(注:真实场景下,生成的回滚SQL经常会需要进一步筛选。结合grep、编辑器等) ```bash -shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -tuser --start-file='mysql-bin.000054' --start-position=257427 --stop-position=504272 -B > /tmp/rollback.sql + shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -tuser --start-file='mysql-bin.000054' --start-position=257427 --stop-position=504272 -B > /tmp/rollback.sql rollback.sql 输出: -INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-14 23:09:07', 24530, 'tt'); #start 257427 end 504272 time 2016-12-26 11:44:56 -INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-12 00:00:00', 24529, '小李'); #start 257427 end 504272 time 2016-12-26 11:44:56 -... -INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2014-11-11 00:04:48', 2, '小钱'); #start 257427 end 265754 time 2016-12-26 11:44:56 + INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-14 23:09:07', 24530, 'tt'); #start 257427 end 504272 time 2016-12-26 11:44:56 + INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-12 00:00:00', 24529, '小李'); #start 257427 end 504272 time 2016-12-26 11:44:56 + ... + INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2014-11-11 00:04:48', 2, '小钱'); #start 257427 end 265754 time 2016-12-26 11:44:56 shell> wc -l /tmp/rollback.sql 16128 /tmp/rollback.sql @@ -177,14 +177,15 @@ INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2014-11-11 00:04:48' shell> mysql -h127.0.0.1 -P3306 -uadmin -p'admin' < /tmp/rollback.sql mysql> select count(*) from user; -+----------+ -| count(*) | -+----------+ -| 16389 | -+----------+ -``` + +----------+ + | count(*) | + +----------+ + | 16389 | + +----------+ + ``` + +### TIPS -###TIPS * 闪回的关键是快速筛选出真正需要回滚的SQL。 * 先根据库、表、时间做一次过滤,再根据位置做更准确的过滤。 * 由于数据一直在写入,要确保回滚sql中不包含其他数据。可根据是否是同一事务、误操作行数、字段值的特征等等来帮助判断。 @@ -192,15 +193,17 @@ INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2014-11-11 00:04:48' * 如果待回滚的表与其他表有关联,要与开发说明回滚和不回滚各自的副作用,再确定方案。 * 回滚后数据变化,可能对用户和线上应用造成困惑(类似幻读)。 -####再重复下最重要的两点:**筛选出正确SQL**!**沟通清楚**! +#### 再重复下最重要的两点:**筛选出正确SQL**!**沟通清楚**! 闪回工具 === + MySQL闪回特性最早由阿里彭立勋开发,彭在2012年给官方提交了一个patch,并对[闪回设计思路](http://www.penglixun.com/tech/database/mysql_flashback_feature.html)做了说明(设计思路很有启发性,强烈推荐阅读)。但是因为种种原因,业内安装这个patch的团队至今还是少数,真正应用到线上的更是少之又少。彭之后,又有多位人员针对不同mysql版本不同语言开发了闪回工具,原理用的都是彭的思路。 我将这些闪回工具按实现方式分成了三类。 * 第一类是以patch形式集成到官方工具mysqlbinlog中。以彭提交的patch为代表。 + > 优点 > > * 上手成本低。mysqlbinlog原有的选项都能直接利用,只是多加了一个闪回选项。闪回特性未来有可能被官方收录。 @@ -215,6 +218,7 @@ MySQL闪回特性最早由阿里彭立勋开发,彭在2012年给官方提交 > 这些缺点,可能都是闪回没有流行开来的原因。 * 第二类是独立工具,通过伪装成slave拉取binlog来进行处理。以binlog2sql为代表。 + > 优点 > > * 兼容性好。伪装成slave拉binlog这项技术在业界应用的非常广泛,多个开发语言都有这样的活跃项目,MySQL版本的兼容性由这些项目搞定,闪回工具的兼容问题不再突出。 @@ -226,6 +230,7 @@ MySQL闪回特性最早由阿里彭立勋开发,彭在2012年给官方提交 > * 必须开启MySQL server。 * 第三类是简单脚本。先用mysqlbinlog解析出文本格式的binlog,再根据回滚原理用正则进行匹配并替换。 + > 优点 > > * 脚本写起来方便,往往能快速搞定某个特定问题。 @@ -240,7 +245,8 @@ MySQL闪回特性最早由阿里彭立勋开发,彭在2012年给官方提交 就目前的闪回工具而言,线上环境的闪回,笔者建议使用binlog2sql,离线解析使用mysqlbinlog。 -###关于DDL的flashback +### 关于DDL的flashback + 本文所述的flashback仅针对DML语句的快速回滚。但如果误操作是DDL的话,是无法利用binlog做快速回滚的,因为即使在row模式下,binlog对于DDL操作也不会记录每行数据的变化。要实现DDL快速回滚,必须修改MySQL源码,使得在执行DDL前先备份老数据。目前有多个mysql定制版本实现了DDL闪回特性,阿里林晓斌团队提交了patch给MySQL官方,MariaDB预计在不久后加入包含DDL的flashback特性。DDL闪回的副作用是会增加额外存储。考虑到其应用频次实在过低,本文不做详述,有兴趣的同学可以自己去了解,重要的几篇文章我在参考资料中做了引用。 @@ -249,6 +255,7 @@ MySQL闪回特性最早由阿里彭立勋开发,彭在2012年给官方提交 参考资料 ============== + [1] MySQL Internals Manual , [Chapter 20 The Binary Log](http://dev.mysql.com/doc/internals/en/binary-log.html) From d8e21693783dd6b4f092a38e8ef3c26e6e57bb3d Mon Sep 17 00:00:00 2001 From: danfengcao Date: Fri, 7 Apr 2017 17:21:56 +0800 Subject: [PATCH 08/22] fix priciple-and-practice.md --- example/mysql-flashback-priciple-and-practice.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/example/mysql-flashback-priciple-and-practice.md b/example/mysql-flashback-priciple-and-practice.md index 48d8b3a..9ed7706 100644 --- a/example/mysql-flashback-priciple-and-practice.md +++ b/example/mysql-flashback-priciple-and-practice.md @@ -148,7 +148,8 @@ mysql> select count(*) from user; ```bash shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -tuser --start-file='mysql-bin.000054' --start-datetime='2016-12-26 11:44:00' --stop-datetime='2016-12-26 11:50:00' > /tmp/raw.sql -raw.sql 输出: + + raw.sql输出: DELETE FROM `test`.`user` WHERE `addtime`='2014-11-11 00:04:48' AND `id`=2 AND `name`='小钱' LIMIT 1; #start 257427 end 265754 time 2016-12-26 11:44:56 DELETE FROM `test`.`user` WHERE `addtime`='2015-11-11 20:25:00' AND `id`=3 AND `name`='小孙' LIMIT 1; #start 257427 end 265754 time 2016-12-26 11:44:56 ... @@ -161,7 +162,8 @@ raw.sql 输出: ```bash shell> python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -tuser --start-file='mysql-bin.000054' --start-position=257427 --stop-position=504272 -B > /tmp/rollback.sql -rollback.sql 输出: + + rollback.sql 输出: INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-14 23:09:07', 24530, 'tt'); #start 257427 end 504272 time 2016-12-26 11:44:56 INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2016-12-12 00:00:00', 24529, '小李'); #start 257427 end 504272 time 2016-12-26 11:44:56 ... From 7a44278a29d020bb6e9ecb98cb586e018598d0ef Mon Sep 17 00:00:00 2001 From: danfengcao Date: Fri, 7 Apr 2017 17:23:18 +0800 Subject: [PATCH 09/22] fix priciple-and-practice.md --- example/mysql-flashback-priciple-and-practice.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/mysql-flashback-priciple-and-practice.md b/example/mysql-flashback-priciple-and-practice.md index 9ed7706..4c8b027 100644 --- a/example/mysql-flashback-priciple-and-practice.md +++ b/example/mysql-flashback-priciple-and-practice.md @@ -170,7 +170,7 @@ mysql> select count(*) from user; INSERT INTO `test`.`user`(`addtime`, `id`, `name`) VALUES ('2014-11-11 00:04:48', 2, '小钱'); #start 257427 end 265754 time 2016-12-26 11:44:56 shell> wc -l /tmp/rollback.sql -16128 /tmp/rollback.sql + 16128 /tmp/rollback.sql ``` 4. 与业务方确认回滚sql没问题,执行回滚语句。登录mysql,确认回滚成功。 From c510a632a57ed96a7dc3aad421672b70645b7ce5 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Tue, 16 May 2017 16:29:24 +0800 Subject: [PATCH 10/22] update package version --- README.md | 2 +- requirements.txt | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 139c1cf..9059e3d 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ binlog2sql =========== * 数据快速回滚(闪回) -* 主从切换后数据不一致的修复 +* 主从切换后新master丢数据的修复 * 从binlog生成标准SQL,带来的衍生功能 diff --git a/requirements.txt b/requirements.txt index fb691af..c89ac75 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -PyMySQL==0.7.8 -wheel==0.24.0 -mysql-replication==0.9 +PyMySQL==0.7.11 +wheel==0.29.0 +mysql-replication==0.13 From 8ffa5ac3dbae99ef7283e1f61e5e3f25a15153e7 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Wed, 7 Jun 2017 15:59:25 +0800 Subject: [PATCH 11/22] midify README --- README.md | 3 ++- example/mysql-flashback-priciple-and-practice.md | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9059e3d..ad3eacd 100644 --- a/README.md +++ b/README.md @@ -195,10 +195,11 @@ Empty set (0.00 sec) +----+--------+---------------------+ ``` -### 限制 +### 限制(对比mysqlbinlog) * mysql server必须开启,离线模式下不能解析 * 参数 _binlog\_row\_image_ 必须为FULL,暂不支持MINIMAL +* 解析速度不如mysqlbinlog ### 优点(对比mysqlbinlog) diff --git a/example/mysql-flashback-priciple-and-practice.md b/example/mysql-flashback-priciple-and-practice.md index 4c8b027..cde5815 100644 --- a/example/mysql-flashback-priciple-and-practice.md +++ b/example/mysql-flashback-priciple-and-practice.md @@ -188,14 +188,13 @@ mysql> select count(*) from user; ### TIPS -* 闪回的关键是快速筛选出真正需要回滚的SQL。 +* 闪回的目标:快速筛选出真正需要回滚的数据。 * 先根据库、表、时间做一次过滤,再根据位置做更准确的过滤。 * 由于数据一直在写入,要确保回滚sql中不包含其他数据。可根据是否是同一事务、误操作行数、字段值的特征等等来帮助判断。 -* 执行回滚sql时如有报错,需要查实具体原因,一般是因为对应的数据已发生变化。由于是严格的行模式,只要有唯一键(包括主键)存在,就只会报某条数据不存在的错,不必担心会更新不该操作的数据。 -* 如果待回滚的表与其他表有关联,要与开发说明回滚和不回滚各自的副作用,再确定方案。 -* 回滚后数据变化,可能对用户和线上应用造成困惑(类似幻读)。 +* 执行回滚sql时如有报错,需要查实具体原因,一般是因为对应的数据已发生变化。由于是严格的行模式,只要有唯一键(包括主键)存在,就只会报某条数据不存在的错,不必担心会更新不该操作的数据。业务如果有特殊逻辑,数据回滚可能会带来影响。 +* 如果只回滚某张表,并且该表有关联表,关联表并不会被回滚,需与业务方沟通清楚。 -#### 再重复下最重要的两点:**筛选出正确SQL**!**沟通清楚**! +#### **哪些数据需要回滚,让业务方来判断!** 闪回工具 === From 114521b2b2e5815db5189f3c3e277a185386442b Mon Sep 17 00:00:00 2001 From: danfengcao Date: Mon, 14 Aug 2017 11:56:12 +0800 Subject: [PATCH 12/22] use env to find python interpreter instead of hardcode path --- binlog2sql/binlog2sql.py | 2 +- binlog2sql/binlog2sql_util.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 53d029d..c74d72f 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # -*- coding: utf-8 -*- import os, sys, datetime diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index c3c5f95..6059192 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # -*- coding: utf-8 -*- import os, sys, argparse, datetime From b94ae4440dcaf2f3c45675b9719b42fc57ec1712 Mon Sep 17 00:00:00 2001 From: zouyi Date: Wed, 29 Nov 2017 09:27:18 +0800 Subject: [PATCH 13/22] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ad3eacd..b63ead1 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` -K, --no-primary-key 对INSERT语句去除主键。可选。 --B, --flashback 生成回滚语句,可解析大文件,不受内存限制,每打印一千行加一句SLEEP SELECT(1)。可选。与stop-never或no-primary-key不能同时添加。 +-B, --flashback 生成回滚语句,可解析大文件,不受内存限制,每打印一千行加一句SELECT SLEEP(1)。可选。与stop-never或no-primary-key不能同时添加。 **解析范围控制** From f430ecc8ef5d5c611f076a69c1511b2627330b03 Mon Sep 17 00:00:00 2001 From: dfcao Date: Mon, 11 Dec 2017 15:39:09 +0800 Subject: [PATCH 14/22] standardize code format to comply with pep8 --- binlog2sql/binlog2sql.py | 182 +++++++++++++++++-------------- binlog2sql/binlog2sql_util.py | 197 +++++++++++++++++++--------------- tests/test_binlog2sql_util.py | 52 ++++----- 3 files changed, 235 insertions(+), 196 deletions(-) diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index c74d72f..89e641f 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -1,7 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os, sys, datetime +import os +import sys +import datetime import pymysql from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( @@ -10,114 +12,130 @@ DeleteRowsEvent, ) from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent -from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, reversed_lines + class Binlog2sql(object): - def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None, - stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False): - ''' - connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave} - ''' - if not startFile: - raise ValueError('lack of parameter,startFile.') - - self.connectionSettings = connectionSettings - self.startFile = startFile - self.startPos = startPos if startPos else 4 # use binlog v4 - self.endFile = endFile if endFile else startFile - self.endPos = endPos - self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") - self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") + def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, + start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, + flashback=False, stop_never=False): + """ + conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'} + """ + + if not start_file: + raise ValueError('Lack of parameter: start_file') + + self.conn_setting = connection_settings + self.start_file = start_file + self.start_pos = start_pos if start_pos else 4 # use binlog v4 + self.end_file = end_file if end_file else start_file + self.end_pos = end_pos + if start_time: + self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") + else: + self.start_time = datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") + if stop_time: + self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") + else: + self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") self.only_schemas = only_schemas if only_schemas else None self.only_tables = only_tables if only_tables else None - self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever) + self.no_pk, self.flashback, self.stop_never = (no_pk, flashback, stop_never) self.binlogList = [] - self.connection = pymysql.connect(**self.connectionSettings) - try: - cur = self.connection.cursor() - cur.execute("SHOW MASTER STATUS") - self.eofFile, self.eofPos = cur.fetchone()[:2] - cur.execute("SHOW MASTER LOGS") - binIndex = [row[0] for row in cur.fetchall()] - if self.startFile not in binIndex: - raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile) + self.connection = pymysql.connect(**self.conn_setting) + with self.connection as cursor: + cursor.execute("SHOW MASTER STATUS") + self.eof_file, self.eof_pos = cursor.fetchone()[:2] + cursor.execute("SHOW MASTER LOGS") + bin_index = [row[0] for row in cursor.fetchall()] + if self.start_file not in bin_index: + raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file) binlog2i = lambda x: x.split('.')[1] - for bin in binIndex: - if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile): - self.binlogList.append(bin) - - cur.execute("SELECT @@server_id") - self.serverId = cur.fetchone()[0] - if not self.serverId: - raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port'])) - finally: - cur.close() + for binary in bin_index: + if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file): + self.binlogList.append(binary) + + cursor.execute("SELECT @@server_id") + self.server_id = cursor.fetchone()[0] + if not self.server_id: + raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port'])) def process_binlog(self): - stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId, - log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas, + stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id, + log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, only_tables=self.only_tables, resume_stream=True) - cur = self.connection.cursor() - tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile. - ftmp = open(tmpFile ,"w") - flagLastEvent = False - eStartPos, lastPos = stream.log_pos, stream.log_pos + cursor = self.connection.cursor() + # to simplify code, we do not use flock for tmp_file. + tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) + f_tmp = open(tmp_file, "w") + flag_last_event = False + e_start_pos, last_pos = stream.log_pos, stream.log_pos try: - for binlogevent in stream: - if not self.stopnever: - if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos): - flagLastEvent = True - elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime: - if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): - lastPos = binlogevent.packet.log_pos + for binlog_event in stream: + if not self.stop_never: + if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ + (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): + flag_last_event = True + elif datetime.datetime.fromtimestamp(binlog_event.timestamp) < self.start_time: + if not (isinstance(binlog_event, RotateEvent) + or isinstance(binlog_event, FormatDescriptionEvent)): + last_pos = binlog_event.packet.log_pos continue - elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime): + elif (stream.log_file not in self.binlogList) or \ + (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ + (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ + (datetime.datetime.fromtimestamp(binlog_event.timestamp) >= self.stop_time): break # else: # raise ValueError('unknown binlog file or position') - if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN': - eStartPos = lastPos + if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN': + e_start_pos = last_pos - if isinstance(binlogevent, QueryEvent): - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk) + if isinstance(binlog_event, QueryEvent): + sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, + flashback=self.flashback, no_pk=self.no_pk) if sql: - print sql - elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent): - for row in binlogevent.rows: - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos) + print(sql) + elif isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) or\ + isinstance(binlog_event, DeleteRowsEvent): + for row in binlog_event.rows: + sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk, + row=row, flashback=self.flashback, e_start_pos=e_start_pos) if self.flashback: - ftmp.write(sql + '\n') + f_tmp.write(sql + '\n') else: - print sql + print(sql) - if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): - lastPos = binlogevent.packet.log_pos - if flagLastEvent: + if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)): + last_pos = binlog_event.packet.log_pos + if flag_last_event: break - ftmp.close() + f_tmp.close() if self.flashback: - self.print_rollback_sql(tmpFile) + self.print_rollback_sql(filename=tmp_file) finally: - os.remove(tmpFile) - cur.close() + os.remove(tmp_file) + cursor.close() stream.close() return True - def print_rollback_sql(self, fin): - '''print rollback sql from tmpfile''' - with open(fin) as ftmp: - sleepInterval = 1000 + @staticmethod + def print_rollback_sql(filename): + """print rollback sql from tmp_file""" + with open(filename) as f_tmp: + sleep_interval = 1000 i = 0 - for line in reversed_lines(ftmp): - print line.rstrip() - if i >= sleepInterval: - print 'SELECT SLEEP(1);' + for line in reversed_lines(f_tmp): + print(line.rstrip()) + if i >= sleep_interval: + print('SELECT SLEEP(1);') i = 0 else: i += 1 @@ -129,9 +147,9 @@ def __del__(self): if __name__ == '__main__': args = command_line_args(sys.argv[1:]) - connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password} - binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile, - startPos=args.startPos, endFile=args.endFile, endPos=args.endPos, - startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases, - only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever) + conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} + binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, + end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time, + stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables, + no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never) binlog2sql.process_binlog() diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 6059192..b10a7e1 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -1,15 +1,16 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os, sys, argparse, datetime -import pymysql -from pymysqlreplication import BinLogStreamReader +import os +import sys +import argparse +import datetime from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, ) -from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent +from pymysqlreplication.event import QueryEvent def is_valid_datetime(string): @@ -19,23 +20,25 @@ def is_valid_datetime(string): except: return False + def create_unique_file(filename): version = 0 - resultFile = filename + result_file = filename # if we have to try more than 1000 times, something is seriously wrong - while os.path.exists(resultFile) and version<1000: - resultFile = filename + '.' + str(version) + while os.path.exists(result_file) and version < 1000: + result_file = filename + '.' + str(version) version += 1 if version >= 1000: raise OSError('cannot create unique file %s.[0-1000]' % filename) - return resultFile + return result_file + -def parse_args(args): +def parse_args(): """parse args for binlog2sql""" parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want', add_help=False) connect_setting = parser.add_argument_group('connect setting') - connect_setting.add_argument('-h','--host', dest='host', type=str, + connect_setting.add_argument('-h', '--host', dest='host', type=str, help='Host the MySQL database server located', default='127.0.0.1') connect_setting.add_argument('-u', '--user', dest='user', type=str, help='MySQL Username to log in as', default='root') @@ -43,23 +46,31 @@ def parse_args(args): help='MySQL Password to use', default='') connect_setting.add_argument('-P', '--port', dest='port', type=int, help='MySQL port to use', default=3306) - range = parser.add_argument_group('range filter') - range.add_argument('--start-file', dest='startFile', type=str, - help='Start binlog file to be parsed') - range.add_argument('--start-position', '--start-pos', dest='startPos', type=int, - help='Start position of the --start-file', default=4) - range.add_argument('--stop-file', '--end-file', dest='endFile', type=str, - help="Stop binlog file to be parsed. default: '--start-file'", default='') - range.add_argument('--stop-position', '--end-pos', dest='endPos', type=int, - help="Stop position of --stop-file. default: latest position of '--stop-file'", default=0) - range.add_argument('--start-datetime', dest='startTime', type=str, - help="Start reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='') - range.add_argument('--stop-datetime', dest='stopTime', type=str, - help="Stop reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='') - parser.add_argument('--stop-never', dest='stopnever', action='store_true', - help='Wait for more data from the server. default: stop replicate at the last binlog when you start binlog2sql', default=False) - - parser.add_argument('--help', dest='help', action='store_true', help='help infomation', default=False) + interval = parser.add_argument_group('interval filter') + interval.add_argument('--start-file', dest='start_file', type=str, help='Start binlog file to be parsed') + interval.add_argument('--start-position', '--start-pos', dest='start_pos', type=int, + help='Start position of the --start-file', default=4) + interval.add_argument('--stop-file', '--end-file', dest='end_file', type=str, + help="Stop binlog file to be parsed. default: '--start-file'", default='') + interval.add_argument('--stop-position', '--end-pos', dest='end_pos', type=int, + help="Stop position. default: latest position of '--stop-file'", default=0) + interval.add_argument('--start-datetime', dest='start_time', type=str, + help="Start reading the binlog at first event having a datetime equal or posterior " + "to the argument; the argument must be a date and time in the local time zone," + " in any format accepted by the MySQL server for DATETIME and TIMESTAMP types," + " for example: 2004-12-25 11:25:56 (you should probably use quotes for your " + "shell to set it properly).", default='') + interval.add_argument('--stop-datetime', dest='stop_time', type=str, + help="Stop reading the binlog at first event having a datetime equal or posterior " + "to the argument; the argument must be a date and time in the local time zone," + " in any format accepted by the MySQL server for DATETIME and TIMESTAMP types," + " for example: 2004-12-25 11:25:56 (you should probably use quotes for your " + "shell to set it properly).", default='') + parser.add_argument('--stop-never', dest='stop_never', action='store_true', + help="Wait for more data from the server. default: stop replicate at the last binlog" + " when you start binlog2sql", default=False) + + parser.add_argument('--help', dest='help', action='store_true', help='help information', default=False) schema = parser.add_argument_group('schema filter') schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*', @@ -68,26 +79,28 @@ def parse_args(args): help='tables you want to process', default='') # exclusive = parser.add_mutually_exclusive_group() - parser.add_argument('-K', '--no-primary-key', dest='nopk', action='store_true', - help='Generate insert sql without primary key if exists', default=False) + parser.add_argument('-K', '--no-primary-key', dest='no_pk', action='store_true', + help='Generate insert sql without primary key if exists', default=False) parser.add_argument('-B', '--flashback', dest='flashback', action='store_true', - help='Flashback data to start_postition of start_file', default=False) + help='Flashback data to start_position of start_file', default=False) return parser + def command_line_args(args): - needPrintHelp = False if args else True - parser = parse_args(args) + need_print_help = False if args else True + parser = parse_args() args = parser.parse_args(args) - if args.help or needPrintHelp: + if args.help or need_print_help: parser.print_help() sys.exit(1) - if not args.startFile: - raise ValueError('Lack of parameter: startFile') - if args.flashback and args.stopnever: + if not args.start_file: + raise ValueError('Lack of parameter: start_file') + if args.flashback and args.stop_never: raise ValueError('Only one of flashback or stop-never can be True') - if args.flashback and args.nopk: - raise ValueError('Only one of flashback or nopk can be True') - if (args.startTime and not is_valid_datetime(args.startTime)) or (args.stopTime and not is_valid_datetime(args.stopTime)): + if args.flashback and args.no_pk: + raise ValueError('Only one of flashback or no_pk can be True') + if (args.start_time and not is_valid_datetime(args.start_time)) or \ + (args.stop_time and not is_valid_datetime(args.stop_time)): raise ValueError('Incorrect datetime argument') return args @@ -99,6 +112,7 @@ def compare_items((k, v)): else: return '`%s`=%%s' % k + def fix_object(value): """Fixes python objects so that they can be properly inserted into SQL queries""" if isinstance(value, unicode): @@ -106,96 +120,103 @@ def fix_object(value): else: return value -def concat_sql_from_binlogevent(cursor, binlogevent, row=None, eStartPos=None, flashback=False, nopk=False): - if flashback and nopk: - raise ValueError('only one of flashback or nopk can be True') - if not (isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent) or isinstance(binlogevent, QueryEvent)): - raise ValueError('binlogevent must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent') + +def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False): + if flashback and no_pk: + raise ValueError('only one of flashback or no_pk can be True') + if not (isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) + or isinstance(binlog_event, DeleteRowsEvent) or isinstance(binlog_event, QueryEvent)): + raise ValueError('binlog_event must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent') sql = '' - if isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent): - pattern = generate_sql_pattern(binlogevent, row=row, flashback=flashback, nopk=nopk) + if isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) \ + or isinstance(binlog_event, DeleteRowsEvent): + pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk) sql = cursor.mogrify(pattern['template'], pattern['values']) - sql += ' #start %s end %s time %s' % (eStartPos, binlogevent.packet.log_pos, datetime.datetime.fromtimestamp(binlogevent.timestamp)) - elif flashback is False and isinstance(binlogevent, QueryEvent) and binlogevent.query != 'BEGIN' and binlogevent.query != 'COMMIT': - if binlogevent.schema: - sql = 'USE {0};\n'.format(binlogevent.schema) - sql += '{0};'.format(fix_object(binlogevent.query)) + time = datetime.datetime.fromtimestamp(binlog_event.timestamp) + sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time) + elif flashback is False and isinstance(binlog_event, QueryEvent) and binlog_event.query != 'BEGIN' \ + and binlog_event.query != 'COMMIT': + if binlog_event.schema: + sql = 'USE {0};\n'.format(binlog_event.schema) + sql += '{0};'.format(fix_object(binlog_event.query)) return sql -def generate_sql_pattern(binlogevent, row=None, flashback=False, nopk=False): + +def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): template = '' values = [] if flashback is True: - if isinstance(binlogevent, WriteRowsEvent): + if isinstance(binlog_event, WriteRowsEvent): template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, + binlog_event.schema, binlog_event.table, ' AND '.join(map(compare_items, row['values'].items())) ) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, DeleteRowsEvent): + elif isinstance(binlog_event, DeleteRowsEvent): template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format( - binlogevent.schema, binlogevent.table, - ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), + binlog_event.schema, binlog_event.table, + ', '.join(map(lambda key: '`%s`' % key, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, UpdateRowsEvent): + elif isinstance(binlog_event, UpdateRowsEvent): template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, - ', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]), + binlog_event.schema, binlog_event.table, + ', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]), ' AND '.join(map(compare_items, row['after_values'].items()))) values = map(fix_object, row['before_values'].values()+row['after_values'].values()) else: - if isinstance(binlogevent, WriteRowsEvent): - if nopk: - # print binlogevent.__dict__ - # tableInfo = (binlogevent.table_map)[binlogevent.table_id] + if isinstance(binlog_event, WriteRowsEvent): + if no_pk: + # print binlog_event.__dict__ + # tableInfo = (binlog_event.table_map)[binlog_event.table_id] # if tableInfo.primary_key: # row['values'].pop(tableInfo.primary_key) - if binlogevent.primary_key: - row['values'].pop(binlogevent.primary_key) + if binlog_event.primary_key: + row['values'].pop(binlog_event.primary_key) template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format( - binlogevent.schema, binlogevent.table, - ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), + binlog_event.schema, binlog_event.table, + ', '.join(map(lambda key: '`%s`' % key, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, DeleteRowsEvent): - template ='DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, - ' AND '.join(map(compare_items, row['values'].items())) - ) + elif isinstance(binlog_event, DeleteRowsEvent): + template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( + binlog_event.schema, binlog_event.table, ' AND '.join(map(compare_items, row['values'].items()))) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, UpdateRowsEvent): + elif isinstance(binlog_event, UpdateRowsEvent): template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, - ', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]), + binlog_event.schema, binlog_event.table, + ', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]), ' AND '.join(map(compare_items, row['before_values'].items())) ) values = map(fix_object, row['after_values'].values()+row['before_values'].values()) - return {'template':template, 'values':values} + return {'template': template, 'values': values} -def reversed_lines(file): - "Generate the lines of file in reverse order." + +def reversed_lines(fin): + """Generate the lines of file in reverse order.""" part = '' - for block in reversed_blocks(file): + for block in reversed_blocks(fin): for c in reversed(block): if c == '\n' and part: yield part[::-1] part = '' part += c - if part: yield part[::-1] + if part: + yield part[::-1] + -def reversed_blocks(file, blocksize=4096): - "Generate blocks of file's contents in reverse order." - file.seek(0, os.SEEK_END) - here = file.tell() +def reversed_blocks(fin, block_size=4096): + """Generate blocks of file's contents in reverse order.""" + fin.seek(0, os.SEEK_END) + here = fin.tell() while 0 < here: - delta = min(blocksize, here) + delta = min(block_size, here) here -= delta - file.seek(here, os.SEEK_SET) - yield file.read(delta) + fin.seek(here, os.SEEK_SET) + yield fin.read(delta) diff --git a/tests/test_binlog2sql_util.py b/tests/test_binlog2sql_util.py index a3b740a..34b61be 100755 --- a/tests/test_binlog2sql_util.py +++ b/tests/test_binlog2sql_util.py @@ -4,14 +4,6 @@ import sys import unittest import mock -from pymysql.cursors import Cursor -from pymysql.connections import Connection -from pymysqlreplication.event import BinLogEvent -from pymysqlreplication.row_event import ( - WriteRowsEvent, - UpdateRowsEvent, - DeleteRowsEvent, -) sys.path.append("..") from binlog2sql.binlog2sql_util import * @@ -38,13 +30,13 @@ def test_create_unique_file(self, mock_path): def test_command_line_args(self): try: - command_line_args([]) + command_line_args(['--flashback', '--no-primary-key']) except Exception as e: - self.assertEqual(str(e), "Lack of parameter: startFile") + self.assertEqual(str(e), "Lack of parameter: start_file") try: command_line_args(['--start-file', 'mysql-bin.000058', '--flashback', '--no-primary-key']) except Exception as e: - self.assertEqual(str(e), "Only one of flashback or nopk can be True") + self.assertEqual(str(e), "Only one of flashback or no_pk can be True") try: command_line_args(['--start-file', 'mysql-bin.000058', '--flashback', '--stop-never']) except Exception as e: @@ -63,35 +55,43 @@ def test_fix_object(self): self.assertEqual(fix_object(u'unicode'), u'unicode'.encode('utf-8')) def test_generate_sql_pattern(self): - row = {'values':{'data':'hello','id':1}} + row = {'values': {'data': 'hello', 'id': 1}} mock_write_event = mock.create_autospec(WriteRowsEvent) mock_write_event.schema = 'test' mock_write_event.table = 'tbl' mock_write_event.primary_key = 'id' - pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=False, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) - pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=True, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) - pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=False, nopk=True) + pattern = generate_sql_pattern(binlog_event=mock_write_event, row=row, flashback=False, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) + pattern = generate_sql_pattern(binlog_event=mock_write_event, row=row, flashback=True, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_write_event, row=row, flashback=False, no_pk=True) self.assertEqual(pattern, {'values': ['hello'], 'template': 'INSERT INTO `test`.`tbl`(`data`) VALUES (%s);'}) row = {'values':{'data':'hello','id':1}} mock_delete_event = mock.create_autospec(DeleteRowsEvent) mock_delete_event.schema = 'test' mock_delete_event.table = 'tbl' - pattern = generate_sql_pattern(binlogevent=mock_delete_event, row=row, flashback=False, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) - pattern = generate_sql_pattern(binlogevent=mock_delete_event, row=row, flashback=True, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) + pattern = generate_sql_pattern(binlog_event=mock_delete_event, row=row, flashback=False, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_delete_event, row=row, flashback=True, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) - row = {'before_values':{'data':'hello','id':1}, 'after_values':{'data':'binlog2sql','id':1}} + row = {'before_values': {'data': 'hello', 'id': 1}, 'after_values': {'data': 'binlog2sql', 'id': 1}} mock_update_event = mock.create_autospec(UpdateRowsEvent) mock_update_event.schema = 'test' mock_update_event.table = 'tbl' - pattern = generate_sql_pattern(binlogevent=mock_update_event, row=row, flashback=False, nopk=False) - self.assertEqual(pattern, {'values': ['binlog2sql', 1, 'hello', 1], 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND `id`=%s LIMIT 1;'}) - pattern = generate_sql_pattern(binlogevent=mock_update_event, row=row, flashback=True, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1, 'binlog2sql', 1], 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_update_event, row=row, flashback=False, no_pk=False) + self.assertEqual(pattern, {'values': ['binlog2sql', 1, 'hello', 1], + 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND' + ' `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_update_event, row=row, flashback=True, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1, 'binlog2sql', 1], + 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND' + ' `id`=%s LIMIT 1;'}) if __name__ == '__main__': From 9a1464228111ff7378a838bc29af2543bbb321ca Mon Sep 17 00:00:00 2001 From: dfcao Date: Fri, 15 Dec 2017 10:31:16 +0800 Subject: [PATCH 15/22] support python3 --- .gitignore | 3 +++ README.md | 2 +- binlog2sql/binlog2sql.py | 46 +++++++++++----------------------- binlog2sql/binlog2sql_util.py | 47 ++++++++++++++++++++++++++++++----- 4 files changed, 60 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 72364f9..4c9b811 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +*~ +.idea/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/README.md b/README.md index b63ead1..b618c4d 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ binlog2sql 正常维护。应用于大众点评线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行 * 已测试环境 - * Python 2.6, 2.7 + * Python 2.6, 2.7, 3.4 * MySQL 5.6 diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 89e641f..e486b02 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os import sys import datetime import pymysql @@ -12,7 +11,8 @@ DeleteRowsEvent, ) from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent -from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, reversed_lines +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \ + temp_open, print_rollback_sql class Binlog2sql(object): @@ -35,7 +35,7 @@ def __init__(self, connection_settings, start_file=None, start_pos=None, end_fil if start_time: self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") else: - self.start_time = datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") + self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") if stop_time: self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") else: @@ -69,19 +69,21 @@ def process_binlog(self): log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, only_tables=self.only_tables, resume_stream=True) - cursor = self.connection.cursor() - # to simplify code, we do not use flock for tmp_file. - tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) - f_tmp = open(tmp_file, "w") flag_last_event = False e_start_pos, last_pos = stream.log_pos, stream.log_pos - try: + # to simplify code, we do not use flock for tmp_file. + tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) + with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor: for binlog_event in stream: if not self.stop_never: + try: + event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp) + except OSError: + event_time = datetime.datetime(1980, 1, 1, 0, 0) if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): flag_last_event = True - elif datetime.datetime.fromtimestamp(binlog_event.timestamp) < self.start_time: + elif event_time < self.start_time: if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)): last_pos = binlog_event.packet.log_pos @@ -89,7 +91,7 @@ def process_binlog(self): elif (stream.log_file not in self.binlogList) or \ (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ - (datetime.datetime.fromtimestamp(binlog_event.timestamp) >= self.stop_time): + (event_time >= self.stop_time): break # else: # raise ValueError('unknown binlog file or position') @@ -116,36 +118,18 @@ def process_binlog(self): last_pos = binlog_event.packet.log_pos if flag_last_event: break - f_tmp.close() + stream.close() + f_tmp.close() if self.flashback: - self.print_rollback_sql(filename=tmp_file) - finally: - os.remove(tmp_file) - cursor.close() - stream.close() + print_rollback_sql(filename=tmp_file) return True - @staticmethod - def print_rollback_sql(filename): - """print rollback sql from tmp_file""" - with open(filename) as f_tmp: - sleep_interval = 1000 - i = 0 - for line in reversed_lines(f_tmp): - print(line.rstrip()) - if i >= sleep_interval: - print('SELECT SLEEP(1);') - i = 0 - else: - i += 1 - def __del__(self): pass if __name__ == '__main__': - args = command_line_args(sys.argv[1:]) conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index b10a7e1..d8c8330 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -5,6 +5,7 @@ import sys import argparse import datetime +from contextlib import contextmanager from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, @@ -12,6 +13,11 @@ ) from pymysqlreplication.event import QueryEvent +if sys.version > '3': + PY3PLUS = True +else: + PY3PLUS = False + def is_valid_datetime(string): try: @@ -33,6 +39,16 @@ def create_unique_file(filename): return result_file +@contextmanager +def temp_open(filename, mode): + f = open(filename, mode) + try: + yield f + finally: + f.close() + os.remove(filename) + + def parse_args(): """parse args for binlog2sql""" @@ -105,8 +121,9 @@ def command_line_args(args): return args -def compare_items((k, v)): - #caution: if v is NULL, may need to process +def compare_items(items): + # caution: if v is NULL, may need to process + (k, v) = items if v is None: return '`%s` IS %%s' % k else: @@ -115,7 +132,9 @@ def compare_items((k, v)): def fix_object(value): """Fixes python objects so that they can be properly inserted into SQL queries""" - if isinstance(value, unicode): + if PY3PLUS and isinstance(value, bytes): + return value.decode('utf-8') + elif not PY3PLUS and isinstance(value, unicode): return value.encode('utf-8') else: return value @@ -166,7 +185,7 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): binlog_event.schema, binlog_event.table, ', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]), ' AND '.join(map(compare_items, row['after_values'].items()))) - values = map(fix_object, row['before_values'].values()+row['after_values'].values()) + values = map(fix_object, list(row['before_values'].values())+list(row['after_values'].values())) else: if isinstance(binlog_event, WriteRowsEvent): if no_pk: @@ -193,15 +212,31 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): ', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]), ' AND '.join(map(compare_items, row['before_values'].items())) ) - values = map(fix_object, row['after_values'].values()+row['before_values'].values()) + values = map(fix_object, list(row['after_values'].values())+list(row['before_values'].values())) + + return {'template': template, 'values': list(values)} + - return {'template': template, 'values': values} +def print_rollback_sql(filename): + """print rollback sql from tmp_file""" + with open(filename, "rb") as f_tmp: + sleep_interval = 1000 + i = 0 + for line in reversed_lines(f_tmp): + print(line.rstrip()) + if i >= sleep_interval: + print('SELECT SLEEP(1);') + i = 0 + else: + i += 1 def reversed_lines(fin): """Generate the lines of file in reverse order.""" part = '' for block in reversed_blocks(fin): + if PY3PLUS: + block = block.decode("utf-8") for c in reversed(block): if c == '\n' and part: yield part[::-1] From 1201a85406a4e552bf04e029901913b930cfa35b Mon Sep 17 00:00:00 2001 From: dfcao Date: Fri, 15 Dec 2017 11:43:23 +0800 Subject: [PATCH 16/22] add rollback interval --- README.md | 7 +++++-- binlog2sql/binlog2sql.py | 27 +++++++++++++++++++++------ binlog2sql/binlog2sql_util.py | 27 ++++++--------------------- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index b618c4d..0bb46e3 100644 --- a/README.md +++ b/README.md @@ -92,11 +92,13 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` -K, --no-primary-key 对INSERT语句去除主键。可选。 --B, --flashback 生成回滚语句,可解析大文件,不受内存限制,每打印一千行加一句SELECT SLEEP(1)。可选。与stop-never或no-primary-key不能同时添加。 +-B, --flashback 生成回滚语句,可解析大文件,不受内存限制。可选。与stop-never或no-primary-key不能同时添加。 + +--back-interval -B模式下,每打印一千行回滚语句,加一句SLEEP多少秒,如果不需要SLEEP,请设为0。可选。默认1.0。 **解析范围控制** ---start-file 起始解析文件。必须。 +--start-file 起始解析文件,只需文件名,无需全路径 。必须。 --start-position/--start-pos start-file的起始解析位置。可选。默认为start-file的起始位置。 @@ -114,6 +116,7 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` -t, --tables 只输出目标tables的sql。可选。默认为空。 + ### 应用案例 #### **误删整张表数据,需要紧急回滚** diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index e486b02..97bb171 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -11,15 +11,14 @@ DeleteRowsEvent, ) from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent -from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \ - temp_open, print_rollback_sql +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, temp_open, reversed_lines class Binlog2sql(object): def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, - flashback=False, stop_never=False): + flashback=False, stop_never=False, back_interval=1.0): """ conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'} """ @@ -43,7 +42,7 @@ def __init__(self, connection_settings, start_file=None, start_pos=None, end_fil self.only_schemas = only_schemas if only_schemas else None self.only_tables = only_tables if only_tables else None - self.no_pk, self.flashback, self.stop_never = (no_pk, flashback, stop_never) + self.no_pk, self.flashback, self.stop_never, self.back_interval = (no_pk, flashback, stop_never, back_interval) self.binlogList = [] self.connection = pymysql.connect(**self.conn_setting) @@ -122,9 +121,24 @@ def process_binlog(self): stream.close() f_tmp.close() if self.flashback: - print_rollback_sql(filename=tmp_file) + self.print_rollback_sql(filename=tmp_file) return True + def print_rollback_sql(self, filename): + """print rollback sql from tmp_file""" + with open(filename, "rb") as f_tmp: + batch_size = 1000 + i = 0 + for line in reversed_lines(f_tmp): + print(line.rstrip()) + if i >= batch_size: + i = 0 + if self.back_interval: + print('SELECT SLEEP(%s);' % self.back_interval) + else: + i += 1 + + def __del__(self): pass @@ -135,5 +149,6 @@ def __del__(self): binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time, stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables, - no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never) + no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never, + back_interval=args.back_interval) binlog2sql.process_binlog() diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index d8c8330..6fedbf5 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -78,14 +78,13 @@ def parse_args(): "shell to set it properly).", default='') interval.add_argument('--stop-datetime', dest='stop_time', type=str, help="Stop reading the binlog at first event having a datetime equal or posterior " - "to the argument; the argument must be a date and time in the local time zone," - " in any format accepted by the MySQL server for DATETIME and TIMESTAMP types," - " for example: 2004-12-25 11:25:56 (you should probably use quotes for your " - "shell to set it properly).", default='') + "to the argument;", default='') parser.add_argument('--stop-never', dest='stop_never', action='store_true', - help="Wait for more data from the server. default: stop replicate at the last binlog" - " when you start binlog2sql", default=False) - + help="Wait for more data from the server. default: stop replicate at the last binlog " + "when you start binlog2sql", default=False) + parser.add_argument('--back-interval', dest='back_interval', type=float, + help="Sleep time between chunks of 1000 rollback sql. If you do not need sleep between chunks, " + " set it to 0", default=1.0) parser.add_argument('--help', dest='help', action='store_true', help='help information', default=False) schema = parser.add_argument_group('schema filter') @@ -217,20 +216,6 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): return {'template': template, 'values': list(values)} -def print_rollback_sql(filename): - """print rollback sql from tmp_file""" - with open(filename, "rb") as f_tmp: - sleep_interval = 1000 - i = 0 - for line in reversed_lines(f_tmp): - print(line.rstrip()) - if i >= sleep_interval: - print('SELECT SLEEP(1);') - i = 0 - else: - i += 1 - - def reversed_lines(fin): """Generate the lines of file in reverse order.""" part = '' From 4369cc0ebe57a166516010210d52b0dfd7765bfd Mon Sep 17 00:00:00 2001 From: dfcao Date: Fri, 15 Dec 2017 16:17:16 +0800 Subject: [PATCH 17/22] support filter by sql type and dml type --- README.md | 29 ++++++++++++----------- binlog2sql/binlog2sql.py | 20 +++++++--------- binlog2sql/binlog2sql_util.py | 43 ++++++++++++++++++++++++----------- 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 0bb46e3..d957c7c 100644 --- a/README.md +++ b/README.md @@ -88,34 +88,37 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` **解析模式** ---stop-never 持续同步binlog。可选。不加则同步至执行命令时最新的binlog位置。 +--stop-never 持续解析binlog。可选。,默认False,同步至执行命令时最新的binlog位置。 --K, --no-primary-key 对INSERT语句去除主键。可选。 +-K, --no-primary-key 对INSERT语句去除主键。可选。默认False --B, --flashback 生成回滚语句,可解析大文件,不受内存限制。可选。与stop-never或no-primary-key不能同时添加。 +-B, --flashback 生成回滚SQL,可解析大文件,不受内存限制。可选。默认False。与stop-never或no-primary-key不能同时添加。 ---back-interval -B模式下,每打印一千行回滚语句,加一句SLEEP多少秒,如果不需要SLEEP,请设为0。可选。默认1.0。 +--back-interval -B模式下,每打印一千行回滚SQL,加一句SLEEP多少秒,如不想加SLEEP,请设为0。可选。默认1.0。 **解析范围控制** --start-file 起始解析文件,只需文件名,无需全路径 。必须。 ---start-position/--start-pos start-file的起始解析位置。可选。默认为start-file的起始位置。 +--start-position/--start-pos 起始解析位置。可选。默认为start-file的起始位置。 ---stop-file/--end-file 末尾解析文件。可选。默认为start-file同一个文件。若解析模式为stop-never,此选项失效。 +--stop-file/--end-file 终止解析文件。可选。默认为start-file同一个文件。若解析模式为stop-never,此选项失效。 ---stop-position/--end-pos stop-file的末尾解析位置。可选。默认为stop-file的最末位置;若解析模式为stop-never,此选项失效。 +--stop-position/--end-pos 终止解析位置。可选。默认为stop-file的最末位置;若解析模式为stop-never,此选项失效。 ---start-datetime 从哪个时间点的binlog开始解析,格式必须为datetime,如'2016-11-11 11:11:11'。可选。默认不过滤。 +--start-datetime 起始解析时间,格式'%Y-%m-%d %H:%M:%S'。可选。默认不过滤。 ---stop-datetime 到哪个时间点的binlog停止解析,格式必须为datetime,如'2016-11-11 11:11:11'。可选。默认不过滤。 +--stop-datetime 终止解析时间,格式'%Y-%m-%d %H:%M:%S'。可选。默认不过滤。 **对象过滤** --d, --databases 只输出目标db的sql。可选。默认为空。 +-d, --databases 只解析目标db的sql,多个库用空格隔开,如-d db1 db2。可选。默认为空。 --t, --tables 只输出目标tables的sql。可选。默认为空。 +-t, --tables 只解析目标table的sql,多张表用空格隔开,如-t tbl1 tbl2。可选。默认为空。 +--only-dml 只解析dml,忽略ddl。可选。默认TRUE。 + +--sql-type 只解析指定类型,支持INSERT, UPDATE, DELETE。多个类型用空格隔开,如--sql-type INSERT DELETE。可选。默认为增删改都解析。用了此参数但没填任何类型,则三者都不解析。 ### 应用案例 @@ -209,12 +212,12 @@ Empty set (0.00 sec) * 纯Python开发,安装与使用都很简单 * 自带flashback、no-primary-key解析模式,无需再装补丁 * flashback模式下,更适合闪回[实战](./example/mysql-flashback-priciple-and-practice.md) -* 解析为标准SQL,方便理解、调试 +* 解析为标准SQL,方便理解、筛选 * 代码容易改造,可以支持更多个性化解析 ### 贡献者 -* danfengcao 维护者 [https://github.com/danfengcao](https://github.com/danfengcao) +* danfengcao 作者,维护者 [https://github.com/danfengcao](https://github.com/danfengcao) * 大众点评DBA团队 想法交流,使用体验 [dba_op@dianping.com](dba_op@dianping.com) * 赵承勇 pymysqlreplication权限bug [https://github.com/imzcy1987](https://github.com/imzcy1987) * 陈路炳 bug报告(字段值为空时的处理),使用体验 [https://github.com/bingluchen](https://github.com/bingluchen) diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 97bb171..0120837 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -5,20 +5,16 @@ import datetime import pymysql from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.row_event import ( - WriteRowsEvent, - UpdateRowsEvent, - DeleteRowsEvent, -) from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent -from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, temp_open, reversed_lines +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, temp_open, \ + reversed_lines, is_dml_event, event_type class Binlog2sql(object): def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, - flashback=False, stop_never=False, back_interval=1.0): + flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None): """ conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'} """ @@ -43,6 +39,8 @@ def __init__(self, connection_settings, start_file=None, start_pos=None, end_fil self.only_schemas = only_schemas if only_schemas else None self.only_tables = only_tables if only_tables else None self.no_pk, self.flashback, self.stop_never, self.back_interval = (no_pk, flashback, stop_never, back_interval) + self.only_dml = only_dml + self.sql_type = [t.upper() for t in sql_type] if sql_type else [] self.binlogList = [] self.connection = pymysql.connect(**self.conn_setting) @@ -98,13 +96,12 @@ def process_binlog(self): if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN': e_start_pos = last_pos - if isinstance(binlog_event, QueryEvent): + if isinstance(binlog_event, QueryEvent) and not self.only_dml: sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, flashback=self.flashback, no_pk=self.no_pk) if sql: print(sql) - elif isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) or\ - isinstance(binlog_event, DeleteRowsEvent): + elif is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type: for row in binlog_event.rows: sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk, row=row, flashback=self.flashback, e_start_pos=e_start_pos) @@ -138,7 +135,6 @@ def print_rollback_sql(self, filename): else: i += 1 - def __del__(self): pass @@ -150,5 +146,5 @@ def __del__(self): end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time, stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables, no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never, - back_interval=args.back_interval) + back_interval=args.back_interval, only_dml=args.only_dml, sql_type=args.sql_type) binlog2sql.process_binlog() diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 6fedbf5..4b57aaf 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -71,20 +71,11 @@ def parse_args(): interval.add_argument('--stop-position', '--end-pos', dest='end_pos', type=int, help="Stop position. default: latest position of '--stop-file'", default=0) interval.add_argument('--start-datetime', dest='start_time', type=str, - help="Start reading the binlog at first event having a datetime equal or posterior " - "to the argument; the argument must be a date and time in the local time zone," - " in any format accepted by the MySQL server for DATETIME and TIMESTAMP types," - " for example: 2004-12-25 11:25:56 (you should probably use quotes for your " - "shell to set it properly).", default='') + help="Start time. format %%Y-%%m-%%d %%H:%%M:%%S", default='') interval.add_argument('--stop-datetime', dest='stop_time', type=str, - help="Stop reading the binlog at first event having a datetime equal or posterior " - "to the argument;", default='') - parser.add_argument('--stop-never', dest='stop_never', action='store_true', - help="Wait for more data from the server. default: stop replicate at the last binlog " - "when you start binlog2sql", default=False) - parser.add_argument('--back-interval', dest='back_interval', type=float, - help="Sleep time between chunks of 1000 rollback sql. If you do not need sleep between chunks, " - " set it to 0", default=1.0) + help="Stop Time. format %%Y-%%m-%%d %%H:%%M:%%S;", default='') + parser.add_argument('--stop-never', dest='stop_never', action='store_true', default=False, + help="Continuously parse binlog. default: stop at the latest event when you start.") parser.add_argument('--help', dest='help', action='store_true', help='help information', default=False) schema = parser.add_argument_group('schema filter') @@ -93,11 +84,19 @@ def parse_args(): schema.add_argument('-t', '--tables', dest='tables', type=str, nargs='*', help='tables you want to process', default='') + event = parser.add_argument_group('type filter') + event.add_argument('--only-dml', dest='only_dml', action='store_true', default=True, + help='only print dml, ignore ddl') + event.add_argument('--sql-type', dest='sql_type', type=str, nargs='*', default=['INSERT', 'UPDATE', 'DELETE'], + help='Sql type you want to process, support INSERT, UPDATE, DELETE.') + # exclusive = parser.add_mutually_exclusive_group() parser.add_argument('-K', '--no-primary-key', dest='no_pk', action='store_true', help='Generate insert sql without primary key if exists', default=False) parser.add_argument('-B', '--flashback', dest='flashback', action='store_true', help='Flashback data to start_position of start_file', default=False) + parser.add_argument('--back-interval', dest='back_interval', type=float, default=1.0, + help="Sleep time between chunks of 1000 rollback sql. set it to 0 if do not need sleep") return parser @@ -139,6 +138,24 @@ def fix_object(value): return value +def is_dml_event(event): + if isinstance(event, WriteRowsEvent) or isinstance(event, UpdateRowsEvent) or isinstance(event, DeleteRowsEvent): + return True + else: + return False + + +def event_type(event): + t = None + if isinstance(event, WriteRowsEvent): + t = 'INSERT' + elif isinstance(event, UpdateRowsEvent): + t = 'UPDATE' + elif isinstance(event, DeleteRowsEvent): + t = 'DELETE' + return t + + def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False): if flashback and no_pk: raise ValueError('only one of flashback or no_pk can be True') From 9611b80731d577aa41572f557144054a970155af Mon Sep 17 00:00:00 2001 From: danfengcao Date: Thu, 4 Jan 2018 20:31:47 +0800 Subject: [PATCH 18/22] little fix --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d957c7c..f785b72 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ binlog2sql 项目状态 === -正常维护。应用于大众点评线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行 +正常维护。应用于部分公司线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行 * 已测试环境 * Python 2.6, 2.7, 3.4 @@ -211,14 +211,14 @@ Empty set (0.00 sec) * 纯Python开发,安装与使用都很简单 * 自带flashback、no-primary-key解析模式,无需再装补丁 -* flashback模式下,更适合闪回[实战](./example/mysql-flashback-priciple-and-practice.md) +* flashback模式下,更适合[闪回实战](./example/mysql-flashback-priciple-and-practice.md) * 解析为标准SQL,方便理解、筛选 * 代码容易改造,可以支持更多个性化解析 ### 贡献者 * danfengcao 作者,维护者 [https://github.com/danfengcao](https://github.com/danfengcao) -* 大众点评DBA团队 想法交流,使用体验 [dba_op@dianping.com](dba_op@dianping.com) +* 大众点评DBA团队 想法交流,使用体验 * 赵承勇 pymysqlreplication权限bug [https://github.com/imzcy1987](https://github.com/imzcy1987) * 陈路炳 bug报告(字段值为空时的处理),使用体验 [https://github.com/bingluchen](https://github.com/bingluchen) From 3f328625e3773171c85f6a7380a3de92cdcc9506 Mon Sep 17 00:00:00 2001 From: dfcao Date: Thu, 15 Mar 2018 23:08:17 +0800 Subject: [PATCH 19/22] fix: set --only-dml default false --- README.md | 2 +- binlog2sql/binlog2sql_util.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f785b72..4adee23 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` -t, --tables 只解析目标table的sql,多张表用空格隔开,如-t tbl1 tbl2。可选。默认为空。 ---only-dml 只解析dml,忽略ddl。可选。默认TRUE。 +--only-dml 只解析dml,忽略ddl。可选。默认False。 --sql-type 只解析指定类型,支持INSERT, UPDATE, DELETE。多个类型用空格隔开,如--sql-type INSERT DELETE。可选。默认为增删改都解析。用了此参数但没填任何类型,则三者都不解析。 diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 4b57aaf..4c2f5f6 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -85,7 +85,7 @@ def parse_args(): help='tables you want to process', default='') event = parser.add_argument_group('type filter') - event.add_argument('--only-dml', dest='only_dml', action='store_true', default=True, + event.add_argument('--only-dml', dest='only_dml', action='store_true', default=False, help='only print dml, ignore ddl') event.add_argument('--sql-type', dest='sql_type', type=str, nargs='*', default=['INSERT', 'UPDATE', 'DELETE'], help='Sql type you want to process, support INSERT, UPDATE, DELETE.') From fdb3fbd3d7138b9d6b9ce4744b0a886e2df25053 Mon Sep 17 00:00:00 2001 From: dfcao Date: Wed, 30 May 2018 12:28:17 +0800 Subject: [PATCH 20/22] fix: process set field type --- README.md | 16 +++++++++------- binlog2sql/binlog2sql_util.py | 2 ++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 4adee23..c58af05 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ binlog2sql 项目状态 === -正常维护。应用于部分公司线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行 +正常维护。应用于部分公司线上环境。 * 已测试环境 - * Python 2.6, 2.7, 3.4 - * MySQL 5.6 + * Python 2.7, 3.4+ + * MySQL 5.6, 5.7 安装 @@ -217,14 +217,16 @@ Empty set (0.00 sec) ### 贡献者 -* danfengcao 作者,维护者 [https://github.com/danfengcao](https://github.com/danfengcao) +* [danfengcao](https://github.com/danfengcao) 作者,维护者 [https://github.com/danfengcao] * 大众点评DBA团队 想法交流,使用体验 -* 赵承勇 pymysqlreplication权限bug [https://github.com/imzcy1987](https://github.com/imzcy1987) -* 陈路炳 bug报告(字段值为空时的处理),使用体验 [https://github.com/bingluchen](https://github.com/bingluchen) +* [赵承勇](https://github.com/imzcy1987) pymysqlreplication权限bug #2 +* [陈路炳](https://github.com/bingluchen) bug报告(字段值为空时的处理),使用体验 +* [dba-jane](https://github.com/DBA-jane) pymysqlreplication时间字段浮点数bug #29 +* [lujinke](https://github.com/lujinke) bug报告(set字段的处理 #32) ### 联系我 -有任何问题,请与我联系。微信:danfeng053005 邮箱:[danfengcao.info@gmail.com](danfengcao.info@gmail.com) +有任何问题,请与我联系。邮箱:[danfengcao.info@gmail.com](danfengcao.info@gmail.com) 欢迎提问题提需求,欢迎pull requests! diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 4c2f5f6..8c12ec7 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -130,6 +130,8 @@ def compare_items(items): def fix_object(value): """Fixes python objects so that they can be properly inserted into SQL queries""" + if isinstance(value, set): + value = ','.join(value) if PY3PLUS and isinstance(value, bytes): return value.decode('utf-8') elif not PY3PLUS and isinstance(value, unicode): From 42c6f5353b2cb36ef97eb73b1e111cb028be4cc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?vcdf=E6=9B=B9=E5=8D=95=E9=94=8B?= Date: Mon, 27 Aug 2018 17:56:12 +0800 Subject: [PATCH 21/22] support interactive password input --- binlog2sql/binlog2sql_util.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 8c12ec7..3a9bb31 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -5,13 +5,15 @@ import sys import argparse import datetime +import getpass from contextlib import contextmanager +from pymysqlreplication.event import QueryEvent from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, ) -from pymysqlreplication.event import QueryEvent + if sys.version > '3': PY3PLUS = True @@ -58,7 +60,7 @@ def parse_args(): help='Host the MySQL database server located', default='127.0.0.1') connect_setting.add_argument('-u', '--user', dest='user', type=str, help='MySQL Username to log in as', default='root') - connect_setting.add_argument('-p', '--password', dest='password', type=str, + connect_setting.add_argument('-p', '--password', dest='password', type=str, nargs='*', help='MySQL Password to use', default='') connect_setting.add_argument('-P', '--port', dest='port', type=int, help='MySQL port to use', default=3306) @@ -116,6 +118,10 @@ def command_line_args(args): if (args.start_time and not is_valid_datetime(args.start_time)) or \ (args.stop_time and not is_valid_datetime(args.stop_time)): raise ValueError('Incorrect datetime argument') + if not args.password: + args.password = getpass.getpass() + else: + args.password = args.password[0] return args From 5a8e65c432e74950b48b7ead28f424ec931b755d Mon Sep 17 00:00:00 2001 From: dfcao Date: Fri, 12 Oct 2018 14:35:58 +0800 Subject: [PATCH 22/22] fix stop-never option --- README.md | 2 +- binlog2sql/binlog2sql.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c58af05..7ba25a4 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ UPDATE `test`.`test3` SET `addtime`='2016-12-10 13:03:22', `data`='中文', `id` **解析模式** ---stop-never 持续解析binlog。可选。,默认False,同步至执行命令时最新的binlog位置。 +--stop-never 持续解析binlog。可选。默认False,同步至执行命令时最新的binlog位置。 -K, --no-primary-key 对INSERT语句去除主键。可选。默认False diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 0120837..971ec55 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -64,7 +64,7 @@ def __init__(self, connection_settings, start_file=None, start_pos=None, end_fil def process_binlog(self): stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id, log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, - only_tables=self.only_tables, resume_stream=True) + only_tables=self.only_tables, resume_stream=True, blocking=True) flag_last_event = False e_start_pos, last_pos = stream.log_pos, stream.log_pos