Skip to content

Commit 54eaf45

Browse files
committed
WL#7640 Ndb Active Active Delete-Delete handling
Implement mechanisms for extra signalling in the Binlog... - New optional conflict flags, transported by v2 Binlog events in the Binlog extra info area. - Two values currently : Refresh_op and Reflect_op - New reserved AnyValue codes, transferred through a cluster from NdbApi -> NdbApi event Api. - Test to show that the Binlog flag setting works The new signalling is not yet used.
1 parent 0669e84 commit 54eaf45

9 files changed

+451
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use test;
2+
reset master;
3+
create table t1 (a int primary key, b int) engine=ndb;
4+
Low level transport test
5+
set global debug='+d,ndb_injector_set_event_conflict_flags';
6+
Two inserts to the same table, with nothing else in the epoch,
7+
will be grouped together in one binlog event, as the
8+
extra row info is the same.
9+
insert into test.t1 values (1,1), (2,2);
10+
A separate delete.
11+
delete from test.t1 where a=2;
12+
txt
13+
### Extra row data format: 0, len: 4 :0x0200FAFA
14+
### Extra row data format: 0, len: 4 :0x0200FAFA
15+
set global debug='';
16+
delete from test.t1;
17+
reset master;
18+
Specific flag settings test : Reflect
19+
set global debug='+d,ndb_set_reflect_anyvalue';
20+
Two inserts to the same table, with nothing else in the epoch,
21+
will be grouped together in one binlog event, as the
22+
extra row info is the same.
23+
insert into test.t1 values (1,1), (2,2);
24+
A separate delete.
25+
delete from test.t1 where a=2;
26+
txt
27+
### Extra row data format: 0, len: 4 :0x02000100
28+
### Extra row data format: 0, len: 4 :0x02000100
29+
set global debug='';
30+
delete from test.t1;
31+
reset master;
32+
Specific flag settings test : Refresh
33+
set global debug='+d,ndb_set_refresh_anyvalue';
34+
Two inserts to the same table, with nothing else in the epoch,
35+
will be grouped together in one binlog event, as the
36+
extra row info is the same.
37+
insert into test.t1 values (1,1), (2,2);
38+
A separate delete.
39+
delete from test.t1 where a=2;
40+
txt
41+
### Extra row data format: 0, len: 4 :0x02000200
42+
### Extra row data format: 0, len: 4 :0x02000200
43+
set global debug='';
44+
drop table t1;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
--source include/have_ndb.inc
2+
--source include/have_log_bin.inc
3+
--source include/have_debug.inc
4+
5+
use test;
6+
7+
reset master;
8+
9+
create table t1 (a int primary key, b int) engine=ndb;
10+
11+
--echo Low level transport test
12+
set global debug='+d,ndb_injector_set_event_conflict_flags';
13+
14+
--echo Two inserts to the same table, with nothing else in the epoch,
15+
--echo will be grouped together in one binlog event, as the
16+
--echo extra row info is the same.
17+
insert into test.t1 values (1,1), (2,2);
18+
19+
# Wait for epoch boundary
20+
--disable_query_log
21+
--disable_result_log
22+
show binlog events;
23+
--enable_result_log
24+
--enable_query_log
25+
26+
--echo A separate delete.
27+
delete from test.t1 where a=2;
28+
29+
# Wait for epoch boundary
30+
--disable_query_log
31+
--disable_result_log
32+
show binlog events;
33+
--enable_result_log
34+
--enable_query_log
35+
36+
37+
--disable_query_log
38+
let $MYSQLD_DATADIR= `select @@datadir;`;
39+
--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
40+
41+
create table raw_binlog_rows (txt varchar(1000));
42+
43+
--eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
44+
45+
select txt from raw_binlog_rows where txt like '### Extra row data %';
46+
47+
48+
drop table raw_binlog_rows;
49+
--enable_query_log
50+
51+
set global debug='';
52+
delete from test.t1;
53+
reset master;
54+
55+
--echo Specific flag settings test : Reflect
56+
57+
set global debug='+d,ndb_set_reflect_anyvalue';
58+
59+
--echo Two inserts to the same table, with nothing else in the epoch,
60+
--echo will be grouped together in one binlog event, as the
61+
--echo extra row info is the same.
62+
insert into test.t1 values (1,1), (2,2);
63+
64+
# Wait for epoch boundary
65+
--disable_query_log
66+
--disable_result_log
67+
show binlog events;
68+
--enable_result_log
69+
--enable_query_log
70+
71+
--echo A separate delete.
72+
delete from test.t1 where a=2;
73+
74+
# Wait for epoch boundary
75+
--disable_query_log
76+
--disable_result_log
77+
show binlog events;
78+
--enable_result_log
79+
--enable_query_log
80+
81+
82+
--disable_query_log
83+
let $MYSQLD_DATADIR= `select @@datadir;`;
84+
--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
85+
86+
create table raw_binlog_rows (txt varchar(1000));
87+
88+
--eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
89+
90+
select txt from raw_binlog_rows where txt like '### Extra row data %';
91+
92+
93+
drop table raw_binlog_rows;
94+
--enable_query_log
95+
96+
set global debug='';
97+
delete from test.t1;
98+
reset master;
99+
100+
--echo Specific flag settings test : Refresh
101+
set global debug='+d,ndb_set_refresh_anyvalue';
102+
103+
--echo Two inserts to the same table, with nothing else in the epoch,
104+
--echo will be grouped together in one binlog event, as the
105+
--echo extra row info is the same.
106+
insert into test.t1 values (1,1), (2,2);
107+
108+
# Wait for epoch boundary
109+
--disable_query_log
110+
--disable_result_log
111+
show binlog events;
112+
--enable_result_log
113+
--enable_query_log
114+
115+
--echo A separate delete.
116+
delete from test.t1 where a=2;
117+
118+
# Wait for epoch boundary
119+
--disable_query_log
120+
--disable_result_log
121+
show binlog events;
122+
--enable_result_log
123+
--enable_query_log
124+
125+
126+
--disable_query_log
127+
let $MYSQLD_DATADIR= `select @@datadir;`;
128+
--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
129+
130+
create table raw_binlog_rows (txt varchar(1000));
131+
132+
--eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
133+
134+
select txt from raw_binlog_rows where txt like '### Extra row data %';
135+
136+
137+
drop table raw_binlog_rows;
138+
--enable_query_log
139+
140+
set global debug='';
141+
142+
drop table t1;

sql/ha_ndbcluster.cc

+32-1
Original file line numberDiff line numberDiff line change
@@ -4518,6 +4518,19 @@ ha_ndbcluster::eventSetAnyValue(THD *thd,
45184518
}
45194519
}
45204520
#ifndef DBUG_OFF
4521+
DBUG_EXECUTE_IF("ndb_set_reflect_anyvalue",
4522+
{
4523+
fprintf(stderr, "Ndb forcing reflect AnyValue\n");
4524+
options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE;
4525+
ndbcluster_anyvalue_set_reflect_op(options->anyValue);
4526+
});
4527+
DBUG_EXECUTE_IF("ndb_set_refresh_anyvalue",
4528+
{
4529+
fprintf(stderr, "Ndb forcing refresh AnyValue\n");
4530+
options->optionsPresent |= NdbOperation::OperationOptions::OO_ANYVALUE;
4531+
ndbcluster_anyvalue_set_refresh_op(options->anyValue);
4532+
});
4533+
45214534
/*
45224535
MySQLD will set the user-portion of AnyValue (if any) to all 1s
45234536
This tests code filtering ServerIds on the value of server-id-bits.
@@ -4590,10 +4603,28 @@ ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
45904603
if (thd->binlog_row_event_extra_data)
45914604
{
45924605
Ndb_binlog_extra_row_info extra_row_info;
4593-
extra_row_info.loadFromBuffer(thd->binlog_row_event_extra_data);
4606+
if (extra_row_info.loadFromBuffer(thd->binlog_row_event_extra_data) != 0)
4607+
{
4608+
sql_print_warning("NDB Slave : Malformed event received on table %s "
4609+
"cannot parse. Stopping Slave.",
4610+
m_share->key);
4611+
DBUG_RETURN( ER_SLAVE_CORRUPT_EVENT );
4612+
}
4613+
45944614
if (extra_row_info.getFlags() &
45954615
Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID)
45964616
transaction_id = extra_row_info.getTransactionId();
4617+
4618+
#ifndef DBUG_OFF
4619+
if (extra_row_info.getFlags() &
4620+
Ndb_binlog_extra_row_info::NDB_ERIF_CFT_FLAGS)
4621+
{
4622+
DBUG_PRINT("info",
4623+
("Slave : have conflict flags : %x\n",
4624+
extra_row_info.getConflictFlags()));
4625+
/* No effect currently */
4626+
}
4627+
#endif
45974628
}
45984629

45994630
{

sql/ha_ndbcluster_binlog.cc

+74-3
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ static pthread_mutex_t ndb_schema_share_mutex;
163163
extern my_bool opt_log_slave_updates;
164164
static my_bool g_ndb_log_slave_updates;
165165

166+
static bool g_injector_v1_warning_emitted = false;
167+
166168
#ifndef DBUG_OFF
167169
static void print_records(TABLE *table, const uchar *record)
168170
{
@@ -6250,6 +6252,9 @@ handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
62506252
Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
62516253
TABLE *table= event_data->shadow_table;
62526254
NDB_SHARE *share= event_data->share;
6255+
bool reflected_op = false;
6256+
bool refresh_op = false;
6257+
62536258
if (pOp != share->op)
62546259
{
62556260
return 0;
@@ -6258,12 +6263,30 @@ handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
62586263
uint32 anyValue= pOp->getAnyValue();
62596264
if (ndbcluster_anyvalue_is_reserved(anyValue))
62606265
{
6261-
if (!ndbcluster_anyvalue_is_nologging(anyValue))
6266+
if (ndbcluster_anyvalue_is_nologging(anyValue))
6267+
return 0;
6268+
6269+
if (ndbcluster_anyvalue_is_reflect_op(anyValue))
6270+
{
6271+
DBUG_PRINT("info", ("Anyvalue -> Reflect"));
6272+
reflected_op = true;
6273+
anyValue = 0;
6274+
}
6275+
else if (ndbcluster_anyvalue_is_refresh_op(anyValue))
6276+
{
6277+
DBUG_PRINT("info", ("Anyvalue -> Refresh"));
6278+
refresh_op = true;
6279+
anyValue = 0;
6280+
}
6281+
else
6282+
{
62626283
sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
62636284
"event not logged",
62646285
anyValue);
6265-
return 0;
6286+
return 0;
6287+
}
62666288
}
6289+
62676290
uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue);
62686291
bool log_this_slave_update = g_ndb_log_slave_updates;
62696292
bool count_this_event = true;
@@ -6364,6 +6387,7 @@ handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
63646387
originating_server_id= ::server_id;
63656388
else
63666389
{
6390+
assert(!reflected_op && !refresh_op);
63676391
/* Track that we received a replicated row event */
63686392
if (likely( count_this_event ))
63696393
trans_slave_row_count++;
@@ -6397,7 +6421,54 @@ handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
63976421
{
63986422
extra_row_info.setFlags(Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID);
63996423
extra_row_info.setTransactionId(pOp->getTransId());
6400-
extra_row_info_ptr = extra_row_info.generateBuffer();
6424+
}
6425+
6426+
/* Set conflict flags member if necessary */
6427+
Uint16 event_conflict_flags = 0;
6428+
assert(! (reflected_op && refresh_op));
6429+
if (reflected_op)
6430+
{
6431+
event_conflict_flags |= NDB_ERIF_CFT_REFLECT_OP;
6432+
}
6433+
else if (refresh_op)
6434+
{
6435+
event_conflict_flags |= NDB_ERIF_CFT_REFRESH_OP;
6436+
}
6437+
6438+
DBUG_EXECUTE_IF("ndb_injector_set_event_conflict_flags",
6439+
{
6440+
event_conflict_flags = 0xfafa;
6441+
});
6442+
if (event_conflict_flags != 0)
6443+
{
6444+
extra_row_info.setFlags(Ndb_binlog_extra_row_info::NDB_ERIF_CFT_FLAGS);
6445+
extra_row_info.setConflictFlags(event_conflict_flags);
6446+
}
6447+
6448+
if (opt_ndb_log_transaction_id ||
6449+
event_conflict_flags != 0)
6450+
{
6451+
if (likely(!log_bin_use_v1_row_events))
6452+
{
6453+
extra_row_info_ptr = extra_row_info.generateBuffer();
6454+
}
6455+
else
6456+
{
6457+
/**
6458+
* Can't put the metadata in a v1 event
6459+
* Produce 1 warning at most
6460+
*/
6461+
if (!g_injector_v1_warning_emitted)
6462+
{
6463+
sql_print_error("NDB: Binlog Injector discarding row event "
6464+
"meta data as server is using v1 row events. "
6465+
"(%u %x)",
6466+
opt_ndb_log_transaction_id,
6467+
event_conflict_flags);
6468+
6469+
g_injector_v1_warning_emitted = true;
6470+
}
6471+
}
64016472
}
64026473

64036474
DBUG_ASSERT(trans.good());

0 commit comments

Comments
 (0)