简介
在主库semisync加载或初始化时,调用函数semi_sync_master_plugin_init,为transaction_delegate、binlog_storage_delegate、binlog_transmit_delegate增加observer,分别对应plugin的变量为trans_observer、storage_observer、transmit_observer。这三个observer定义了各自的函数接口。
代码分析
所有从server层向plugin的函数调用,都是通过函数指针来完成的,因此我们只需要搞清楚上述几个函数的调用逻辑:
static int semi_sync_master_plugin_init(void *p) { if (repl_semisync.initObject()) return 1; if (register_trans_observer(&trans_observer, p)) return 1; if (register_binlog_storage_observer(&storage_observer, p)) return 1; if (register_binlog_transmit_observer(&transmit_observer, p)) return 1; return 0; }
Binlog_storage_observer storage_observer = { sizeof(Binlog_storage_observer), // len repl_semi_report_binlog_update, // report_update,after_flush };
/** Observe binlog logging storage */ typedef struct Binlog_storage_observer { uint32 len; /** This callback is called after binlog has been flushed This callback is called after cached events have been flushed to binary log file but not yet synced. @param param Observer common parameter @param log_file Binlog file name been updated @param log_pos Binlog position after update @retval 0 Sucess @retval 1 Failure */ int (*after_flush)(Binlog_storage_param *param, const char *log_file, my_off_t log_pos); } Binlog_storage_observer;
阶段说明
该函数调用在第一个阶段,sync binlog之前,flush cache之后。对应的函数是repl_semi_report_binlog_update:
if (repl_semisync.getMasterEnabled()){ error= repl_semisync.writeTranxInBinlog(log_file,log_pos); }
在半同步开启的状态下,调用writeTranxInBinlog函数,更新flush最新的log_file和log_pos
mysql_mutex_lock(&LOCK_binlog_); if (commit_file_name_inited_){ int cmp = ActiveTranx::compare(log_file_name, log_file_pos, commit_file_name_, commit_file_pos_); if (cmp > 0){ strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ commit_file_pos_ = log_file_pos; } }else{ strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ commit_file_pos_ = log_file_pos; commit_file_name_inited_ = true; } if (is_on()){ if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)){ switch_off(); } } mysql_mutex_unlock(&LOCK_binlog_);
说明
1、在LOCK_binlog_锁内执行该代码块
2、当没有可以看到的最大事务binlog时,将flush后的binlog名和位置保存到commit_file_name_和commit_file_pos_中,并将commit_file_name_inited_置成TRUE之后,再次进来时会flush后的binlog位置和文件名会和这个变量比较,保存最大的binlog名和位置。
3、is_on函数即半同步标志state_是TRUE时,即开启了半同步,会将flush的最大的binlog名和位置插入到活跃事务链表中。 active_tranxs_:用于管理所有正在等待从库相应的事务。事务在binlog中有个结束的位置,该结束位置以一个结构体TranxNode表示。当有多个事务在等待时,所有事务的TranxNode按照binlog中结束的位置的前后顺序排列,使用next_组织起来,形成一个链表保存在active_tranxs_中。
struct TranxNode { char log_name_[FN_REFLEN]; my_off_t log_pos_; mysql_cond_t cond;//该条件变量作为半同步判断 int n_waiters; struct TranxNode *next_; /* the next node in the sorted list */ struct TranxNode *hash_next_; /* the next node during hash collision */ };
4、如果插入active_tranxs_链表失败,则需要调用switch_off函数将半同步关掉:
int ReplSemiSyncMaster::switch_off(){ state_ = false; rpl_semi_sync_master_off_times++; wait_file_name_inited_ = false; reply_file_name_inited_ = false; active_tranxs_->signal_waiting_sessions_all();--> |--for (TranxNode* entry= trx_front_; entry; entry=entry->next_) |-- mysql_cond_broadcast(&entry->cond); }
mysql_cond_broadcast(&entry->cond)广播该条件变量,让其他事务线程不在等待ACK了,直接进入异步流程走掉。
repl_semi_report_commit(after_commit)->commitTrx->mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
5、总结:
这个桩主要用来处理每个事务的binlog位置。首先根据每个提交事务对应的结束位置,保存当前实例中已经flush的事务的最大binlog位置。然后通过一个HASH表,记录下当前实例中所有提交事务的结束位置,hash值是binlog文件名和pos组成,缓存这些信息的目的是发送binlog时判断要不要等待从库的ACK,至于发送事务的最后一个binlog时才要在发送的binlog中打标记,告诉从库这个event需要接受ACK。