Skip to content

Commit 67d6fd6

Browse files
author
Karolina Szczepankiewicz
committed
WL#7353 Recover relay log by truncating half written transactions [2/3]
Step 2: Relay log sanitization This step implements relay log sanitization. Sanitization process runs during applier metadata initialization, in case relay-log-recovery server option is turned off. It is implemented in a separate class, 'Relay_log_sanitizer'. Whenever a partially-written transaction is encountered at the end of the file, sanitizer performs truncation of the relay log file via execution of the existing 'truncate_update_log_file' function. After the relay log truncation and loading information stored in applier metadata on disk, relay log sanitizer updates the position of the receiver thread. In case no valid source position could be obtained from the existing relay log files, sanitizer matches the receiver thread position with the applier source position read from the applier metadata. Change-Id: I13203812b2d73f38fca711473097daed55721f00
1 parent e908c71 commit 67d6fd6

File tree

9 files changed

+228
-17
lines changed

9 files changed

+228
-17
lines changed

sql/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,7 @@ SET (RPL_REPLICA_SRCS
11081108
rpl_reporting.cc
11091109
rpl_rli.cc
11101110
rpl_rli_pdb.cc
1111+
rpl_relay_log_sanitizer.cc
11111112
rpl_replica.cc
11121113
rpl_replica_commit_order_manager.cc
11131114
rpl_replica_until_options.cc

sql/binlog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,8 @@ class MYSQL_BIN_LOG : public TC_LOG {
845845

846846
private:
847847
bool after_write_to_relay_log(Master_info *mi);
848+
849+
public:
848850
/**
849851
* Truncte log file and clear LOG_EVENT_BINLOG_IN_USE_F when update is set.
850852
* @param[in] log_name name of the log file to be trunacted
@@ -858,7 +860,6 @@ class MYSQL_BIN_LOG : public TC_LOG {
858860
bool truncate_update_log_file(const char *log_name, my_off_t valid_pos,
859861
my_off_t binlog_size, bool update);
860862

861-
public:
862863
void make_log_name(char *buf, const char *log_ident);
863864
bool is_active(const char *log_file_name) const;
864865

sql/rpl_mi.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ class Master_info : public Rpl_info {
8888
friend class Rpl_info_factory;
8989

9090
public:
91+
/// In case we start replication from the first binary log file and
92+
/// source log name is empty, we use first_source_log_name instead of
93+
/// 'master_log_name' in the error log
94+
static constexpr auto first_source_log_name = "<First log>";
95+
9196
/**
9297
Host name or ip address stored in the master.info.
9398
*/

sql/rpl_relay_log_sanitizer.cc

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
//
3+
// This program is free software; you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License, version 2.0,
5+
// as published by the Free Software Foundation.
6+
//
7+
// This program is designed to work with certain software (including
8+
// but not limited to OpenSSL) that is licensed under separate terms,
9+
// as designated in a particular file or component or in included license
10+
// documentation. The authors of MySQL hereby grant you an additional
11+
// permission to link the program and your derivative works with the
12+
// separately licensed software that they have either included with
13+
// the program or referenced in the documentation.
14+
//
15+
// This program is distributed in the hope that it will be useful,
16+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
// GNU General Public License, version 2.0, for more details.
19+
//
20+
// You should have received a copy of the GNU General Public License
21+
// along with this program; if not, write to the Free Software
22+
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
23+
24+
#include "sql/rpl_relay_log_sanitizer.h"
25+
#include "sql/binlog.h"
26+
#include "sql/binlog/decompressing_event_object_istream.h" // Decompressing_event_object_istream
27+
#include "sql/binlog_reader.h" // Relay_log_file_reader
28+
#include "sql/rpl_mi.h" // Master_info
29+
30+
namespace rpl {
31+
32+
void Relay_log_sanitizer::analyze_logs(MYSQL_BIN_LOG &log,
33+
bool checksum_validation) {
34+
Relaylog_file_reader reader(checksum_validation);
35+
this->process_logs(reader, log);
36+
}
37+
38+
bool Relay_log_sanitizer::sanitize_log(MYSQL_BIN_LOG &log) {
39+
std::stringstream ss;
40+
if (!is_fatal_error() && is_log_truncation_needed()) {
41+
ss << "Truncating " << get_valid_file()
42+
<< " to log position: " << get_valid_pos();
43+
LogErr(INFORMATION_LEVEL, ER_LOG_SANITIZATION, ss.str().c_str());
44+
return log.truncate_update_log_file(
45+
get_valid_file().c_str(), get_valid_pos(), m_last_file_size, false);
46+
}
47+
if (is_fatal_error()) {
48+
ss << "Skipping log sanitization due to: " << this->m_failure_message;
49+
LogErr(INFORMATION_LEVEL, ER_LOG_SANITIZATION, ss.str().c_str());
50+
}
51+
return false;
52+
}
53+
54+
void Relay_log_sanitizer::update_source_position(Master_info *mi) {
55+
if (is_fatal_error()) return;
56+
std::string new_source_file{""};
57+
my_off_t new_source_pos{0};
58+
if (!this->m_valid_source_file.empty()) {
59+
// we are sure that we were able to obtain source position
60+
assert(this->m_has_valid_source_pos);
61+
new_source_file = m_valid_source_file;
62+
new_source_pos = m_valid_source_pos;
63+
64+
} else if (this->m_has_valid_source_pos) {
65+
// update only postion
66+
new_source_pos = m_valid_source_pos;
67+
new_source_file = mi->get_master_log_name();
68+
} else {
69+
// no valid position could have been recovered from the source file,
70+
// setting to applier source position
71+
new_source_pos = std::max<ulonglong>(BIN_LOG_HEADER_SIZE,
72+
mi->rli->get_group_master_log_pos());
73+
new_source_file = mi->rli->get_group_master_log_name();
74+
}
75+
76+
if (mi->get_master_log_name() != new_source_file ||
77+
mi->get_master_log_pos() != new_source_pos) {
78+
std::stringstream ss;
79+
std::string new_source_file_str = new_source_file.empty()
80+
? Master_info::first_source_log_name
81+
: new_source_file;
82+
ss << "Changing source log coordinates from: " << mi->get_io_rpl_log_name()
83+
<< "; " << mi->get_master_log_pos() << " to: " << new_source_file_str
84+
<< "; " << new_source_pos;
85+
LogErr(INFORMATION_LEVEL, ER_LOG_SANITIZATION, ss.str().c_str());
86+
// set position and filename
87+
mi->set_master_log_pos(new_source_pos);
88+
mi->set_master_log_name(new_source_file.c_str());
89+
}
90+
}
91+
92+
} // namespace rpl

sql/rpl_relay_log_sanitizer.h

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
//
3+
// This program is free software; you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License, version 2.0,
5+
// as published by the Free Software Foundation.
6+
//
7+
// This program is designed to work with certain software (including
8+
// but not limited to OpenSSL) that is licensed under separate terms,
9+
// as designated in a particular file or component or in included license
10+
// documentation. The authors of MySQL hereby grant you an additional
11+
// permission to link the program and your derivative works with the
12+
// separately licensed software that they have either included with
13+
// the program or referenced in the documentation.
14+
//
15+
// This program is distributed in the hope that it will be useful,
16+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
// GNU General Public License, version 2.0, for more details.
19+
//
20+
// You should have received a copy of the GNU General Public License
21+
// along with this program; if not, write to the Free Software
22+
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
23+
24+
#ifndef RPL_RELAY_LOG_SANITIZER_H
25+
#define RPL_RELAY_LOG_SANITIZER_H
26+
27+
#include <functional>
28+
#include "mysql/binlog/event/binlog_event.h"
29+
#include "sql/binlog.h"
30+
#include "sql/binlog/decompressing_event_object_istream.h" // binlog::Decompressing_event_object_istream
31+
#include "sql/binlog/log_sanitizer.h"
32+
#include "sql/binlog_ostream.h" // binlog::tools::Iterator
33+
#include "sql/binlog_reader.h" // Binlog_file_reader
34+
#include "sql/log_event.h" // Log_event
35+
#include "sql/xa.h" // XID
36+
37+
namespace rpl {
38+
39+
/// @brief Class used to recover relay log files
40+
/// @details Recovery of the relay log files is:
41+
/// - finding the last valid position outside of a transaction boundary
42+
/// (analyze_logs)
43+
/// - removing relay logs appearing after the relay log with the last valid
44+
/// position (analyze_logs)
45+
/// - truncation of the relay log file containing the last valid position
46+
/// to remove partially written transaction from the log (sanitize log)
47+
class Relay_log_sanitizer : public binlog::Log_sanitizer {
48+
public:
49+
/// @brief Ctor
50+
Relay_log_sanitizer() { this->m_validation_started = false; }
51+
52+
/// @brief Dtor
53+
~Relay_log_sanitizer() override = default;
54+
55+
/// @brief Given specific log, performs sanitization. Reads log list obtained
56+
/// from the MYSQL_BIN_LOG object and searches for last, fully written
57+
/// transaction. Removes log files that are created after last finished
58+
/// transaction
59+
/// @param log Handle to MYSQL_BIN_LOG object, which does not need to be
60+
/// open. We need specific functions from the MYSQL_BIN_LOG, e.g.
61+
/// reading of the index file
62+
/// @param checksum_validation True if we need to perform relay log file
63+
/// checksum validation
64+
void analyze_logs(MYSQL_BIN_LOG &log, bool checksum_validation);
65+
66+
/// @brief Sanitize opened log
67+
/// @param log Handle to MYSQL_BIN_LOG object, which we will truncate
68+
/// if needed
69+
/// @return false on no error or when no truncation was done, true otherwise
70+
bool sanitize_log(MYSQL_BIN_LOG &log);
71+
72+
/// @brief Updates source position if a valid source position has been
73+
/// found whilst reading the relay log files
74+
/// @param mi Master_info for the receiver thread.
75+
void update_source_position(Master_info *mi);
76+
77+
protected:
78+
/// @brief Function used to obtain memory key for derived classes
79+
/// @returns Reference to a memory key
80+
PSI_memory_key &get_memory_key() const override {
81+
return key_memory_relaylog_recovery;
82+
}
83+
};
84+
85+
} // namespace rpl
86+
87+
#endif // RPL_RELAY_LOG_SANITIZER_H

sql/rpl_replica.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,10 +1259,9 @@ static inline int fill_mts_gaps_and_recover(Master_info *mi) {
12591259
return recovery_error;
12601260
}
12611261

1262-
int load_mi_and_rli_from_repositories(Master_info *mi, bool ignore_if_no_info,
1263-
int thread_mask,
1264-
bool skip_received_gtid_set_recovery,
1265-
bool force_load) {
1262+
int load_mi_and_rli_from_repositories(
1263+
Master_info *mi, bool ignore_if_no_info, int thread_mask,
1264+
bool skip_received_gtid_set_and_relaylog_recovery, bool force_load) {
12661265
DBUG_TRACE;
12671266
assert(mi != nullptr && mi->rli != nullptr);
12681267
int init_error = 0;
@@ -1319,7 +1318,8 @@ int load_mi_and_rli_from_repositories(Master_info *mi, bool ignore_if_no_info,
13191318
if (!ignore_if_no_info || check_return != REPOSITORY_DOES_NOT_EXIST) {
13201319
if ((thread_mask & SLAVE_SQL) != 0 || !(mi->rli->inited)) {
13211320
if (!mi->rli->inited || force_load) {
1322-
if (mi->rli->rli_init_info(skip_received_gtid_set_recovery)) {
1321+
if (mi->rli->rli_init_info(
1322+
skip_received_gtid_set_and_relaylog_recovery)) {
13231323
init_error = 1;
13241324
} else {
13251325
/*

sql/rpl_replica.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,15 +474,16 @@ int init_recovery(Master_info *mi);
474474
are not yet initialized. When true this flag forces the repositories
475475
to load information from table or file.
476476
477-
@param skip_received_gtid_set_recovery When true, skips the received GTID
478-
set recovery.
477+
@param skip_received_gtid_set_and_relaylog_recovery When true, skips the
478+
received GTID set and relay log recovery.
479479
480480
@retval 0 Success
481481
@retval nonzero Error
482482
*/
483483
int load_mi_and_rli_from_repositories(
484484
Master_info *mi, bool ignore_if_no_info, int thread_mask,
485-
bool skip_received_gtid_set_recovery = false, bool force_load = false);
485+
bool skip_received_gtid_set_and_relaylog_recovery = false,
486+
bool force_load = false);
486487
void end_info(Master_info *mi);
487488
/**
488489
Clear the information regarding the `Master_info` and `Relay_log_info` objects

sql/rpl_rli.cc

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#include "sql/rpl_info_handler.h"
6565
#include "sql/rpl_mi.h" // Master_info
6666
#include "sql/rpl_msr.h" // channel_map
67+
#include "sql/rpl_relay_log_sanitizer.h"
6768
#include "sql/rpl_replica.h"
6869
#include "sql/rpl_reporting.h"
6970
#include "sql/rpl_rli_pdb.h" // Slave_worker
@@ -1501,14 +1502,21 @@ bool mysql_show_relaylog_events(THD *thd) {
15011502
return res;
15021503
}
15031504

1504-
int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
1505+
int Relay_log_info::rli_init_info(
1506+
bool skip_received_gtid_set_and_relaylog_recovery) {
15051507
int error = 0;
15061508
enum_return_check check_return = ERROR_CHECKING_REPOSITORY;
15071509
const char *msg = nullptr;
15081510
DBUG_TRACE;
15091511

15101512
mysql_mutex_assert_owner(&data_lock);
15111513

1514+
// Prepare for relay log sanitization
1515+
rpl::Relay_log_sanitizer relay_log_sanitizer;
1516+
bool execute_relay_log_sanitization =
1517+
!is_relay_log_recovery && !relay_log_sanitized &&
1518+
!skip_received_gtid_set_and_relaylog_recovery;
1519+
15121520
/*
15131521
If Relay_log_info is issued again after a failed init_info(), for
15141522
instance because of missing relay log files, it will generate new
@@ -1647,6 +1655,14 @@ int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
16471655
return 1;
16481656
}
16491657

1658+
if (execute_relay_log_sanitization) {
1659+
relay_log_sanitizer.analyze_logs(relay_log,
1660+
opt_replica_sql_verify_checksum);
1661+
// sanitize relay_log if applicable, ignore errors, they will be reported
1662+
// later on
1663+
relay_log_sanitizer.sanitize_log(relay_log);
1664+
}
1665+
16501666
if (!gtid_retrieved_initialized) {
16511667
/* Store the GTID of a transaction spanned in multiple relay log files */
16521668
Gtid_monitoring_info *partial_trx = mi->get_gtid_monitoring_info();
@@ -1656,6 +1672,7 @@ int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
16561672
gtid_set->dbug_print("set of GTIDs in relay log before initialization");
16571673
get_tsid_lock()->unlock();
16581674
#endif
1675+
16591676
/*
16601677
In the init_gtid_set below we pass the mi->transaction_parser.
16611678
This will be useful to ensure that we only add a GTID to
@@ -1669,7 +1686,7 @@ int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
16691686
init_recovery.
16701687
*/
16711688
if (!is_relay_log_recovery && !gtid_retrieved_initialized &&
1672-
!skip_received_gtid_set_recovery &&
1689+
!skip_received_gtid_set_and_relaylog_recovery &&
16731690
relay_log.init_gtid_sets(
16741691
gtid_set, nullptr, opt_replica_sql_verify_checksum,
16751692
true /*true=need lock*/, &mi->transaction_parser, partial_trx)) {
@@ -1689,14 +1706,13 @@ int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
16891706
correctly compute the set of previous gtids.
16901707
*/
16911708
relay_log.set_previous_gtid_set_relaylog(gtid_set);
1709+
16921710
/*
16931711
note, that if open() fails, we'll still have index file open
16941712
but a destructor will take care of that
16951713
*/
1696-
16971714
mysql_mutex_t *log_lock = relay_log.get_log_lock();
16981715
mysql_mutex_lock(log_lock);
1699-
17001716
if (relay_log.open_binlog(
17011717
ln, nullptr,
17021718
(max_relay_log_size ? max_relay_log_size : max_binlog_size), true,
@@ -1706,7 +1722,6 @@ int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
17061722
LogErr(ERROR_LEVEL, ER_RPL_CANT_OPEN_LOG_IN_AM_INIT_INFO);
17071723
return 1;
17081724
}
1709-
17101725
mysql_mutex_unlock(log_lock);
17111726
}
17121727

@@ -1830,6 +1845,12 @@ int Relay_log_info::rli_init_info(bool skip_received_gtid_set_recovery) {
18301845
goto err;
18311846
}
18321847

1848+
// applier metadata was read from disk, update receiver positions
1849+
if (execute_relay_log_sanitization) {
1850+
relay_log_sanitizer.update_source_position(mi);
1851+
relay_log_sanitized = true;
1852+
}
1853+
18331854
/*
18341855
In case of MTS the recovery is deferred until the end of
18351856
load_mi_and_rli_from_repositories.

sql/rpl_rli.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,9 @@ class Relay_log_info : public Rpl_info {
764764
/* Flag that ensures the retrieved GTID set is initialized only once. */
765765
bool gtid_retrieved_initialized;
766766

767+
/// Flag that ensures the relay log is sanitized only once.
768+
bool relay_log_sanitized = false;
769+
767770
/**
768771
Stores information on the last processed transaction or the transaction
769772
that is currently being processed.
@@ -1555,13 +1558,13 @@ class Relay_log_info : public Rpl_info {
15551558
replication log files was ON and the keyring plugin is not available
15561559
anymore.
15571560
1558-
@param skip_received_gtid_set_recovery When true, skips the received GTID
1559-
set recovery.
1561+
@param skip_received_gtid_set_and_relaylog_recovery When true, skips the
1562+
received GTID set and relay log recovery.
15601563
15611564
@retval 0 Success.
15621565
@retval 1 Error.
15631566
*/
1564-
int rli_init_info(bool skip_received_gtid_set_recovery = false);
1567+
int rli_init_info(bool skip_received_gtid_set_and_relaylog_recovery = false);
15651568
void end_info();
15661569

15671570
/** No flush options given to relay log flush */

0 commit comments

Comments
 (0)