Skip to content

Commit f71436f

Browse files
committed
*: WIP.
1 parent dbb064b commit f71436f

File tree

2 files changed

+190
-1
lines changed

2 files changed

+190
-1
lines changed

cmd/go-mysqlbinlog/main.go

Lines changed: 176 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44
package main
55

66
import (
7+
"database/sql"
78
"flag"
89
"fmt"
910
"os"
11+
"reflect"
12+
"strings"
1013

14+
_ "github.com/go-sql-driver/mysql"
1115
"github.com/juju/errors"
1216
"github.com/siddontang/go-mysql/mysql"
1317
"github.com/siddontang/go-mysql/replication"
@@ -28,9 +32,139 @@ var backupPath = flag.String("backup_path", "", "backup path to store binlog fil
2832

2933
var rawMode = flag.Bool("raw", false, "Use raw mode")
3034

35+
var tableColumnsSQL = "select column_name from columns where table_schema = ? and table_name = ?"
36+
37+
func createDB(user string, password string, host string, port int, name string) (*sql.DB, error) {
38+
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", user, password, host, port, name)
39+
db, err := sql.Open("mysql", dbDSN)
40+
if err != nil {
41+
return nil, errors.Trace(err)
42+
}
43+
44+
return db, nil
45+
}
46+
47+
func closeDB(db *sql.DB) error {
48+
return errors.Trace(db.Close())
49+
}
50+
51+
func getTableColumns(db *sql.DB, schema string, table string) ([]string, error) {
52+
if table == "" {
53+
return nil, errors.New("table name is empty")
54+
}
55+
56+
var columns []string
57+
rows, err := db.Query(tableColumnsSQL, schema, table)
58+
if err != nil {
59+
return nil, errors.Trace(err)
60+
}
61+
defer rows.Close()
62+
63+
for rows.Next() {
64+
var field string
65+
err = rows.Scan(
66+
&field,
67+
)
68+
69+
if err != nil {
70+
return nil, errors.Trace(err)
71+
}
72+
73+
columns = append(columns, field)
74+
}
75+
76+
return columns, nil
77+
}
78+
79+
func genInsertSQLs(schema string, table string, datas [][]interface{}, columns []string) ([]string, error) {
80+
sqls := make([]string, 0, len(datas))
81+
columnList := strings.Join(columns, ",")
82+
for _, data := range datas {
83+
if len(data) != len(columns) {
84+
return nil, errors.Errorf("invalid columns and datas - %d, %d", len(datas), len(columns))
85+
}
86+
87+
values := make([]string, 0, len(data))
88+
for _, value := range data {
89+
values = append(values, fmt.Sprintf("%v", value))
90+
}
91+
92+
valueList := strings.Join(values, ",")
93+
sql := fmt.Sprintf("insert into %s.%s (%s) values (%s);", schema, table, columnList, valueList)
94+
sqls = append(sqls, sql)
95+
}
96+
97+
return sqls, nil
98+
}
99+
100+
func genWhere(columns []string, data []interface{}, split string) string {
101+
var kvs []byte
102+
for i := range columns {
103+
if i == len(columns)-1 {
104+
kvs = append(kvs, []byte(fmt.Sprintf("%s = %v", columns[i], data[i]))...)
105+
} else {
106+
kvs = append(kvs, []byte(fmt.Sprintf("%s = %v%s", columns[i], data[i], split))...)
107+
}
108+
}
109+
110+
return string(kvs)
111+
}
112+
113+
func genKVs(columns []string, data []interface{}, split string) string {
114+
var kvs []byte
115+
for i := range columns {
116+
if i == len(columns)-1 {
117+
kvs = append(kvs, []byte(fmt.Sprintf("%s = %v", columns[i], data[i]))...)
118+
} else {
119+
kvs = append(kvs, []byte(fmt.Sprintf("%s = %v%s", columns[i], data[i], split))...)
120+
}
121+
}
122+
123+
return string(kvs)
124+
}
125+
126+
func genUpdateSQLs(schema string, table string, datas [][]interface{}, columns []string) ([]string, error) {
127+
sqls := make([]string, 0, len(datas)/2)
128+
for i := 0; i < len(datas); i += 2 {
129+
oldData := datas[i]
130+
newData := datas[i+1]
131+
if len(oldData) != len(newData) {
132+
return nil, errors.Errorf("invalid update datas - %d, %d", len(oldData), len(newData))
133+
}
134+
135+
oldValues := make([]string, 0, len(oldData))
136+
newValues := make([]string, 0, len(newData))
137+
updateColumns := make([]string, 0, len(columns))
138+
139+
for j := range oldData {
140+
if reflect.DeepEqual(oldData[j], newData[j]) {
141+
continue
142+
}
143+
144+
updateColumns = append(updateColumns, columns[j])
145+
oldValues = append(oldValues, fmt.Sprintf("%v", oldData[j]))
146+
newValues = append(newValues, fmt.Sprintf("%v", newData[j]))
147+
kvs := genKVs(updateColumns, newData, ", ")
148+
where := genWhere(updateColumns, oldData, " and ")
149+
sql := fmt.Sprintf("update %s.%s set %s where %s;", schema, table, kvs, where)
150+
sqls = append(sqls, sql)
151+
}
152+
}
153+
154+
return sqls, nil
155+
}
156+
31157
func main() {
32158
flag.Parse()
33159

160+
db, err := createDB(*user, *password, *host, *port, "information_schema")
161+
if err != nil {
162+
fmt.Printf("create mysql connection failed: %v\n", errors.ErrorStack(err))
163+
return
164+
}
165+
166+
defer closeDB(db)
167+
34168
b := replication.NewBinlogSyncer(101, *flavor)
35169

36170
if err := b.RegisterSlave(*host, uint16(*port), *user, *password); err != nil {
@@ -71,8 +205,49 @@ func main() {
71205
return
72206
}
73207

208+
switch ev := e.Event.(type) {
209+
case *replication.RowsEvent:
210+
switch e.Header.EventType {
211+
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
212+
schema := string(ev.Table.Schema)
213+
table := string(ev.Table.Table)
214+
columns, err := getTableColumns(db, schema, table)
215+
if err != nil {
216+
fmt.Printf("parse rows event failed: %v\n", errors.ErrorStack(err))
217+
return
218+
}
219+
220+
sqls, err := genInsertSQLs(schema, table, ev.Rows, columns)
221+
if err != nil {
222+
fmt.Printf("gen insert sqls failed: %v\n", errors.ErrorStack(err))
223+
return
224+
}
225+
226+
for i, sql := range sqls {
227+
fmt.Printf("[insert]%d - %s\n", i, sql)
228+
}
229+
case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
230+
schema := string(ev.Table.Schema)
231+
table := string(ev.Table.Table)
232+
columns, err := getTableColumns(db, schema, table)
233+
if err != nil {
234+
fmt.Printf("parse rows event failed: %v\n", errors.ErrorStack(err))
235+
return
236+
}
237+
238+
sqls, err := genUpdateSQLs(schema, table, ev.Rows, columns)
239+
if err != nil {
240+
fmt.Printf("gen insert sqls failed: %v\n", errors.ErrorStack(err))
241+
return
242+
}
243+
244+
for i, sql := range sqls {
245+
fmt.Printf("[update]%d - %s\n", i, sql)
246+
}
247+
}
248+
}
249+
74250
e.Dump(os.Stdout)
75251
}
76252
}
77-
78253
}

replication/row_event.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ type TableMapEvent struct {
3232
NullBitmap []byte
3333
}
3434

35+
func (e *TableMapEvent) String() string {
36+
if e == nil {
37+
return "<nil>"
38+
}
39+
return fmt.Sprintf("TableMapEvent(%+v)", *e)
40+
}
41+
3542
func (e *TableMapEvent) Decode(data []byte) error {
3643
pos := 0
3744
e.TableID = FixedLengthInt(data[0:e.tableIDSize])
@@ -219,6 +226,13 @@ type RowsEvent struct {
219226
Rows [][]interface{}
220227
}
221228

229+
func (e *RowsEvent) String() string {
230+
if e == nil {
231+
return "<nil>"
232+
}
233+
return fmt.Sprintf("RowsEvent(%+v)", *e)
234+
}
235+
222236
func (e *RowsEvent) Decode(data []byte) error {
223237
pos := 0
224238
e.TableID = FixedLengthInt(data[0:e.tableIDSize])

0 commit comments

Comments
 (0)