简介
在主库semisync加载或初始化时,调用函数semi_sync_master_plugin_init,为transaction_delegate、binlog_storage_delegate、binlog_transmit_delegate增加observer,分别对应plugin的变量为trans_observer、storage_observer、transmit_observer。这三个observer定义了各自的函数接口。
代码分析
所有从server层向plugin的函数调用,都是通过函数指针来完成的,因此我们只需要搞清楚上述几个函数的调用逻辑:
static int semi_sync_master_plugin_init(void *p)
{
if (repl_semisync.initObject())
return 1;
if (register_trans_observer(&trans_observer, p))
return 1;
if (register_binlog_storage_observer(&storage_observer, p))
return 1;
if (register_binlog_transmit_observer(&transmit_observer, p))
return 1;
return 0;
}
Binlog_storage_observer storage_observer = {
sizeof(Binlog_storage_observer), // len
repl_semi_report_binlog_update, // report_update,after_flush
};
/**
Observe binlog logging storage
*/
typedef struct Binlog_storage_observer {
uint32 len;
/**
This callback is called after binlog has been flushed
This callback is called after cached events have been flushed to
binary log file but not yet synced.
@param param Observer common parameter
@param log_file Binlog file name been updated
@param log_pos Binlog position after update
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_flush)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos);
} Binlog_storage_observer;
阶段说明
该函数调用在第一个阶段,sync binlog之前,flush cache之后。对应的函数是repl_semi_report_binlog_update:
if (repl_semisync.getMasterEnabled()){
error= repl_semisync.writeTranxInBinlog(log_file,log_pos);
}
在半同步开启的状态下,调用writeTranxInBinlog函数,更新flush最新的log_file和log_pos
mysql_mutex_lock(&LOCK_binlog_);
if (commit_file_name_inited_){
int cmp = ActiveTranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_);
if (cmp > 0){
strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
commit_file_pos_ = log_file_pos;
}
}else{
strncpy(commit_file_name_, log_file_name, FN_REFLEN-1);
commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
commit_file_pos_ = log_file_pos;
commit_file_name_inited_ = true;
}
if (is_on()){
if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)){
switch_off();
}
}
mysql_mutex_unlock(&LOCK_binlog_);
说明
1、在LOCK_binlog_锁内执行该代码块
2、当没有可以看到的最大事务binlog时,将flush后的binlog名和位置保存到commit_file_name_和commit_file_pos_中,并将commit_file_name_inited_置成TRUE之后,再次进来时会flush后的binlog位置和文件名会和这个变量比较,保存最大的binlog名和位置。
3、is_on函数即半同步标志state_是TRUE时,即开启了半同步,会将flush的最大的binlog名和位置插入到活跃事务链表中。 active_tranxs_:用于管理所有正在等待从库相应的事务。事务在binlog中有个结束的位置,该结束位置以一个结构体TranxNode表示。当有多个事务在等待时,所有事务的TranxNode按照binlog中结束的位置的前后顺序排列,使用next_组织起来,形成一个链表保存在active_tranxs_中。
struct TranxNode {
char log_name_[FN_REFLEN];
my_off_t log_pos_;
mysql_cond_t cond;//该条件变量作为半同步判断
int n_waiters;
struct TranxNode *next_; /* the next node in the sorted list */
struct TranxNode *hash_next_; /* the next node during hash collision */
};
4、如果插入active_tranxs_链表失败,则需要调用switch_off函数将半同步关掉:
int ReplSemiSyncMaster::switch_off(){
state_ = false;
rpl_semi_sync_master_off_times++;
wait_file_name_inited_ = false;
reply_file_name_inited_ = false;
active_tranxs_->signal_waiting_sessions_all();-->
|--for (TranxNode* entry= trx_front_; entry; entry=entry->next_)
|-- mysql_cond_broadcast(&entry->cond);
}
mysql_cond_broadcast(&entry->cond)广播该条件变量,让其他事务线程不在等待ACK了,直接进入异步流程走掉。
repl_semi_report_commit(after_commit)->commitTrx->mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
5、总结:
这个桩主要用来处理每个事务的binlog位置。首先根据每个提交事务对应的结束位置,保存当前实例中已经flush的事务的最大binlog位置。然后通过一个HASH表,记录下当前实例中所有提交事务的结束位置,hash值是binlog文件名和pos组成,缓存这些信息的目的是发送binlog时判断要不要等待从库的ACK,至于发送事务的最后一个binlog时才要在发送的binlog中打标记,告诉从库这个event需要接受ACK。

