@@ -2922,101 +2922,6 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *, THD *thd) {
2922
2922
return !trans_cannot_safely_rollback (thd);
2923
2923
}
2924
2924
2925
- /**
2926
- Adjust log offset in the binary log file for all running slaves
2927
- This class implements call back function for do_for_all_thd().
2928
- It is called for each thd in thd list to adjust offset.
2929
- */
2930
- class Adjust_offset : public Do_THD_Impl {
2931
- public:
2932
- Adjust_offset(my_off_t value) : m_purge_offset(value) {}
2933
- void operator()(THD *thd) override {
2934
- LOG_INFO *linfo;
2935
- mysql_mutex_lock(&thd->LOCK_thd_data);
2936
- if ((linfo = thd->current_linfo)) {
2937
- /*
2938
- Index file offset can be less that purge offset only if
2939
- we just started reading the index file. In that case
2940
- we have nothing to adjust.
2941
- */
2942
- if (linfo->index_file_offset < m_purge_offset)
2943
- linfo->fatal = (linfo->index_file_offset != 0);
2944
- else
2945
- linfo->index_file_offset -= m_purge_offset;
2946
- }
2947
- mysql_mutex_unlock(&thd->LOCK_thd_data);
2948
- }
2949
-
2950
- private:
2951
- my_off_t m_purge_offset;
2952
- };
2953
-
2954
- /*
2955
- Adjust the position pointer in the binary log file for all running slaves.
2956
-
2957
- SYNOPSIS
2958
- adjust_linfo_offsets()
2959
- purge_offset Number of bytes removed from start of log index file
2960
-
2961
- NOTES
2962
- - This is called when doing a PURGE when we delete lines from the
2963
- index log file.
2964
-
2965
- REQUIREMENTS
2966
- - Before calling this function, we have to ensure that no threads are
2967
- using any binary log file before purge_offset.
2968
-
2969
- TODO
2970
- - Inform the slave threads that they should sync the position
2971
- in the binary log file with flush_relay_log_info.
2972
- Now they sync is done for next read.
2973
- */
2974
- static void adjust_linfo_offsets(my_off_t purge_offset) {
2975
- Adjust_offset adjust_offset(purge_offset);
2976
- Global_THD_manager::get_instance()->do_for_all_thd(&adjust_offset);
2977
- }
2978
-
2979
- /**
2980
- This class implements Call back function for do_for_all_thd().
2981
- It is called for each thd in thd list to count
2982
- threads using bin log file
2983
- */
2984
-
2985
- class Log_in_use : public Do_THD_Impl {
2986
- public:
2987
- Log_in_use(const char *value) : m_log_name(value), m_count(0) {
2988
- m_log_name_len = strlen(m_log_name) + 1;
2989
- }
2990
- void operator()(THD *thd) override {
2991
- LOG_INFO *linfo;
2992
- mysql_mutex_lock(&thd->LOCK_thd_data);
2993
- if ((linfo = thd->current_linfo)) {
2994
- if (!strncmp(m_log_name, linfo->log_file_name, m_log_name_len)) {
2995
- LogErr(WARNING_LEVEL, ER_BINLOG_FILE_BEING_READ_NOT_PURGED, m_log_name,
2996
- thd->thread_id());
2997
- m_count++;
2998
- }
2999
- }
3000
- mysql_mutex_unlock(&thd->LOCK_thd_data);
3001
- }
3002
- int get_count() { return m_count; }
3003
-
3004
- private:
3005
- const char *m_log_name;
3006
- size_t m_log_name_len;
3007
- int m_count;
3008
- };
3009
-
3010
- static int log_in_use(const char *log_name) {
3011
- Log_in_use log_in_use(log_name);
3012
- #ifndef NDEBUG
3013
- if (current_thd)
3014
- DEBUG_SYNC(current_thd, "purge_logs_after_lock_index_before_thread_count");
3015
- #endif
3016
- Global_THD_manager::get_instance()->do_for_all_thd(&log_in_use);
3017
- return log_in_use.get_count();
3018
- }
3019
-
3020
2925
static bool purge_error_message (THD *thd, int res) {
3021
2926
uint errcode;
3022
2927
@@ -3361,9 +3266,8 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) {
3361
3266
goto err;
3362
3267
}
3363
3268
3364
- mysql_mutex_lock(&thd->LOCK_thd_data);
3365
- thd->current_linfo = &linfo;
3366
- mysql_mutex_unlock(&thd->LOCK_thd_data);
3269
+ linfo.thread_id = thd->thread_id ();
3270
+ binary_log->register_log_info (&linfo);
3367
3271
3368
3272
BINLOG_FILE_READER binlog_file_reader (
3369
3273
opt_source_verify_checksum,
@@ -3470,9 +3374,8 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) {
3470
3374
} else
3471
3375
my_eof (thd);
3472
3376
3473
- mysql_mutex_lock(&thd->LOCK_thd_data);
3474
- thd->current_linfo = nullptr;
3475
- mysql_mutex_unlock(&thd->LOCK_thd_data);
3377
+ binary_log->unregister_log_info (&linfo);
3378
+
3476
3379
return !errmsg.empty ();
3477
3380
}
3478
3381
@@ -3553,6 +3456,7 @@ void MYSQL_BIN_LOG::cleanup() {
3553
3456
mysql_mutex_destroy (&LOCK_sync);
3554
3457
mysql_mutex_destroy (&LOCK_binlog_end_pos);
3555
3458
mysql_mutex_destroy (&LOCK_xids);
3459
+ mysql_mutex_destroy (&LOCK_log_info);
3556
3460
mysql_cond_destroy (&update_cond);
3557
3461
mysql_cond_destroy (&m_prep_xids_cond);
3558
3462
if (!is_relay_log) {
@@ -3577,6 +3481,7 @@ void MYSQL_BIN_LOG::init_pthread_objects() {
3577
3481
mysql_mutex_init (m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
3578
3482
MY_MUTEX_INIT_FAST);
3579
3483
mysql_mutex_init (m_key_LOCK_xids, &LOCK_xids, MY_MUTEX_INIT_FAST);
3484
+ mysql_mutex_init (m_key_LOCK_log_info, &LOCK_log_info, MY_MUTEX_INIT_FAST);
3580
3485
mysql_cond_init (m_key_update_cond, &update_cond);
3581
3486
mysql_cond_init (m_key_prep_xids_cond, &m_prep_xids_cond);
3582
3487
if (!is_relay_log) {
@@ -7316,6 +7221,64 @@ int MYSQL_BIN_LOG::get_gtid_executed(Sid_map *sid_map, Gtid_set *gtid_set) {
7316
7221
return error;
7317
7222
}
7318
7223
7224
+ void MYSQL_BIN_LOG::register_log_info (LOG_INFO *log_info) {
7225
+ DBUG_TRACE;
7226
+ MUTEX_LOCK (lock, &LOCK_log_info);
7227
+ log_info_set.insert (log_info);
7228
+ }
7229
+
7230
+ void MYSQL_BIN_LOG::unregister_log_info (LOG_INFO *log_info) {
7231
+ DBUG_TRACE;
7232
+ MUTEX_LOCK (lock, &LOCK_log_info);
7233
+ log_info_set.erase (log_info);
7234
+ }
7235
+
7236
+ int MYSQL_BIN_LOG::log_in_use (const char *log_name) {
7237
+ DBUG_TRACE;
7238
+ #ifndef NDEBUG
7239
+ if (current_thd)
7240
+ DEBUG_SYNC (current_thd, " purge_logs_after_lock_index_before_thread_count" );
7241
+ #endif
7242
+
7243
+ mysql_mutex_assert_owner (&LOCK_index);
7244
+ MUTEX_LOCK (lock_log_info, &LOCK_log_info);
7245
+
7246
+ int count = 0 ;
7247
+ int log_name_len = strlen (log_name) + 1 ;
7248
+
7249
+ std::for_each (
7250
+ log_info_set.cbegin (), log_info_set.cend (),
7251
+ [log_name, log_name_len, &count](LOG_INFO *log_info) {
7252
+ if (!strncmp (log_name, log_info->log_file_name , log_name_len)) {
7253
+ LogErr (WARNING_LEVEL, ER_BINLOG_FILE_BEING_READ_NOT_PURGED, log_name,
7254
+ log_info->thread_id );
7255
+ count++;
7256
+ }
7257
+ });
7258
+
7259
+ return count;
7260
+ }
7261
+
7262
+ void MYSQL_BIN_LOG::adjust_linfo_offsets (my_off_t purge_offset) {
7263
+ DBUG_TRACE;
7264
+
7265
+ mysql_mutex_assert_owner (&LOCK_index);
7266
+ MUTEX_LOCK (lock_log_info, &LOCK_log_info);
7267
+
7268
+ std::for_each (log_info_set.cbegin (), log_info_set.cend (),
7269
+ [purge_offset](LOG_INFO *log_info) {
7270
+ /*
7271
+ Index file offset can be less that purge offset only if
7272
+ we just started reading the index file. In that case
7273
+ we have nothing to adjust.
7274
+ */
7275
+ if (log_info->index_file_offset < purge_offset)
7276
+ log_info->fatal = (log_info->index_file_offset != 0 );
7277
+ else
7278
+ log_info->index_file_offset -= purge_offset;
7279
+ });
7280
+ }
7281
+
7319
7282
/* *
7320
7283
Write the contents of the given IO_CACHE to the binary log.
7321
7284
0 commit comments