摘要:在事務(wù)提交結(jié)束之后,事務(wù)可能提交成功,也可能提交失敗。需要把這個(gè)狀態(tài)告知如果發(fā)生了,那么輸出的類型就為,如果成功提交,那么輸出的類型就為。,當(dāng)完成自己所有的狀態(tài)變更之后,會(huì)把的狀態(tài)改為。
作者:姚維
TiDB Binlog Overview這篇文章不是講 TiDB Binlog 組件的源碼,而是講 TiDB 在執(zhí)行 DML/DDL 語句過程中,如何將 Binlog 數(shù)據(jù) 發(fā)送給 TiDB Binlog 集群的 Pump 組件。目前 TiDB 在 DML 上的 Binlog 用的類似 Row-based 的格式。具體 Binlog 具體的架構(gòu)細(xì)節(jié)可以參考這篇 文章。
這里只描述 TiDB 中的代碼實(shí)現(xiàn)。
DML BinlogTiDB 采用 protobuf 來編碼 binlog,具體的格式可以見 binlog.proto。這里討論 TiDB 寫 Binlog 的機(jī)制,以及 Binlog 對(duì) TiDB 寫入的影響。
TiDB 會(huì)在 DML 語句提交,以及 DDL 語句完成的時(shí)候,向 pump 輸出 Binlog。
Statement 執(zhí)行階段DML 語句包括 Insert/Replace、Update、Delete,這里挑 Insert 語句來闡述,其他的語句行為都類似。首先在 Insert 語句執(zhí)行完插入(未提交)之前,會(huì)把自己新增的數(shù)據(jù)記錄在 binlog.TableMutation 結(jié)構(gòu)體中。
// TableMutation 存儲(chǔ)表中數(shù)據(jù)的變化 message TableMutation { // 表的 id,唯一標(biāo)識(shí)一個(gè)表 optional int64 table_id = 1 [(gogoproto.nullable) = false]; // 保存插入的每行數(shù)據(jù) repeated bytes inserted_rows = 2; // 保存修改前和修改后的每行的數(shù)據(jù) repeated bytes updated_rows = 3; // 已廢棄 repeated int64 deleted_ids = 4; // 已廢棄 repeated bytes deleted_pks = 5; // 刪除行的數(shù)據(jù) repeated bytes deleted_rows = 6; // 記錄數(shù)據(jù)變更的順序 repeated MutationType sequence = 7; }
這個(gè)結(jié)構(gòu)體保存于跟每個(gè) Session 鏈接相關(guān)的事務(wù)上下文結(jié)構(gòu)體中 TxnState.mutations。 一張表對(duì)應(yīng)一個(gè) TableMutation 對(duì)象,TableMutation 里面保存了這個(gè)事務(wù)對(duì)這張表的所有變更數(shù)據(jù)。Insert 會(huì)把當(dāng)前語句插入的行,根據(jù) RowID + Row-value 的格式編碼之后,追加到 TableMutation.InsertedRows 中:
func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum, colIDs []int64) error { mutation := t.getMutation(ctx) pk, err := codec.EncodeValue(ctx.GetSessionVars().StmtCtx, nil, types.NewIntDatum(h)) if err != nil { return errors.Trace(err) } value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil) if err != nil { return errors.Trace(err) } bin := append(pk, value...) mutation.InsertedRows = append(mutation.InsertedRows, bin) mutation.Sequence = append(mutation.Sequence, binlog.MutationType_Insert) return nil }
等到所有的語句都執(zhí)行完之后,在 TxnState.mutations 中就保存了當(dāng)前事務(wù)對(duì)所有表的變更數(shù)據(jù)。
Commit 階段對(duì)于 DML 而言,TiDB 的事務(wù)采用 2-phase-commit 算法,一次事務(wù)提交會(huì)分為 Prewrite 階段,以及 Commit 階段。這里分兩個(gè)階段來看看 TiDB 具體的行為。
Prewrite Binlog在 session.doCommit 函數(shù)中,TiDB 會(huì)構(gòu)造 binlog.PrewriteValue:
message PrewriteValue { optional int64 schema_version = 1 [(gogoproto.nullable) = false]; repeated TableMutation mutations = 2 [(gogoproto.nullable) = false]; }
這個(gè) PrewriteValue 中包含了跟這次變動(dòng)相關(guān)的所有行數(shù)據(jù),TiDB 會(huì)填充一個(gè)類型為 binlog.BinlogType_Prewrite 的 Binlog:
info := &binloginfo.BinlogInfo{ Data: &binlog.Binlog{ Tp: binlog.BinlogType_Prewrite, PrewriteValue: prewriteData, }, Client: s.sessionVars.BinlogClient.(binlog.PumpClient), }
TiDB 這里用一個(gè)事務(wù)的 Option kv.BinlogInfo 來把 BinlogInfo 綁定到當(dāng)前要提交的 transaction 對(duì)象中:
s.txn.SetOption(kv.BinlogInfo, info)
在 twoPhaseCommitter.execute 中,在把數(shù)據(jù) prewrite 到 TiKV 的同時(shí),會(huì)調(diào)用 twoPhaseCommitter.prewriteBinlog,這里會(huì)把關(guān)聯(lián)的 binloginfo.BinlogInfo 取出來,把 Binlog 的 binlog.PrewriteValue 輸出到 Pump。
binlogChan := c.prewriteBinlog() err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), c.keys) if binlogChan != nil { binlogErr := <-binlogChan // 等待 write prewrite binlog 完成 if binlogErr != nil { return errors.Trace(binlogErr) } }
這里值得注意的是,在 prewrite 階段,是需要等待 write prewrite binlog 完成之后,才能繼續(xù)做接下去的提交的,這里是為了保證 TiDB 成功提交的事務(wù),Pump 至少一定能收到 Prewrite Binlog。
Commit Binlog在 twoPhaseCommitter.execute 事務(wù)提交結(jié)束之后,事務(wù)可能提交成功,也可能提交失敗。TiDB 需要把這個(gè)狀態(tài)告知 Pump:
err = committer.execute(ctx) if err != nil { committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0) return errors.Trace(err) } committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
如果發(fā)生了 error,那么輸出的 Binlog 類型就為 binlog.BinlogType_Rollback,如果成功提交,那么輸出的 Binlog 類型就為 binlog.BinlogType_Commit。
func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int64) { if !c.shouldWriteBinlog() { return } binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo) binInfo.Data.Tp = tp binInfo.Data.CommitTs = commitTS go func() { err := binInfo.WriteBinlog(c.store.clusterID) if err != nil { log.Errorf("failed to write binlog: %v", err) } }() }
值得注意的是,這里 WriteBinlog 是多帶帶啟動(dòng) goroutine 異步完成的,也就是 Commit 階段,是不再需要等待寫 binlog 完成的。這里可以節(jié)省一點(diǎn) commit 的等待時(shí)間,這里不需要等待是因?yàn)?Pump 即使接收不到這個(gè) Commit Binlog,在超過 timeout 時(shí)間后,Pump 會(huì)自行根據(jù) Prewrite Binlog 到 TiKV 中確認(rèn)當(dāng)條事務(wù)的提交狀態(tài)。
DDL Binlog一個(gè) DDL 有如下幾個(gè)狀態(tài):
const ( JobStateNone JobState = 0 JobStateRunning JobState = 1 JobStateRollingback JobState = 2 JobStateRollbackDone JobState = 3 JobStateDone JobState = 4 JobStateSynced JobState = 6 JobStateCancelling JobState = 7 )
這些狀態(tài)代表了一個(gè) DDL 任務(wù)所處的狀態(tài):
JobStateNone,代表 DDL 任務(wù)還在處理隊(duì)列,TiDB 還沒有開始做這個(gè) DDL。
JobStateRunning,當(dāng) DDL Owner 開始處理這個(gè)任務(wù)的時(shí)候,會(huì)把狀態(tài)設(shè)置為 JobStateRunning,之后 DDL 會(huì)開始變更,TiDB 的 Schema 可能會(huì)涉及多個(gè)狀態(tài)的變更,這中間不會(huì)改變 DDL job 的狀態(tài),只會(huì)變更 Schema 的狀態(tài)。
JobStateDone, 當(dāng) TiDB 完成自己所有的 Schema 狀態(tài)變更之后,會(huì)把 Job 的狀態(tài)改為 Done。
JobStateSynced,當(dāng) TiDB 每做一次 schema 狀態(tài)變更,就會(huì)需要跟集群中的其他 TiDB 做一次同步,但是當(dāng) Job 狀態(tài)為 JobStateDone 之后,在 TiDB 等到所有的 TiDB 節(jié)點(diǎn)同步之后,會(huì)將狀態(tài)修改為 JobStateSynced。
JobStateCancelling,TiDB 提供語法 ADMIN CANCEL DDL JOBS job_ids 用于取消某個(gè)正在執(zhí)行或者還未執(zhí)行的 DDL 任務(wù),當(dāng)成功執(zhí)行這個(gè)命令之后,DDL 任務(wù)的狀態(tài)會(huì)變?yōu)?JobStateCancelling。
JobStateRollingback,當(dāng) DDL Owner 發(fā)現(xiàn) Job 的狀態(tài)變?yōu)?JobStateCancelling 之后,它會(huì)將 job 的狀態(tài)改變?yōu)?JobStateRollingback,以示已經(jīng)開始處理 cancel 請(qǐng)求。
JobStateRollbackDone,在做 cancel 的過程,也會(huì)涉及 Schema 狀態(tài)的變更,也需要經(jīng)歷 Schema 的同步,等到狀態(tài)回滾已經(jīng)做完了,TiDB 會(huì)將 Job 的狀態(tài)設(shè)置為 JobStateRollbackDone。
對(duì)于 Binlog 而言,DDL 的 Binlog 輸出機(jī)制,跟 DML 語句也是類似的,只有開始處理事務(wù)提交階段,才會(huì)開始寫 Binlog 出去。那么對(duì)于 DDL 來說,跟 DML 不一樣,DML 有事務(wù)的概念,對(duì)于 DDL 來說,SQL 的事務(wù)是不影響 DDL 語句的。但是 DDL 里面,上面提到的 Job 的狀態(tài)變更,是作為一個(gè)事務(wù)來提交的(保證狀態(tài)一致性)。所以在每個(gè)狀態(tài)變更,都會(huì)有一個(gè)事務(wù)與之對(duì)應(yīng),但是上面提到的中間狀態(tài),DDL 并不會(huì)往外寫 Binlog,只有 JobStateRollbackDone 以及 JobStateDone 這兩種狀態(tài),TiDB 會(huì)認(rèn)為 DDL 語句已經(jīng)完成,會(huì)對(duì)外發(fā)送 Binlog,發(fā)送之前,會(huì)把 Job 的狀態(tài)從 JobStateDone 修改為 JobStateSynced,這次修改,也涉及一次事務(wù)提交。這塊邏輯的代碼如下:
worker.handleDDLJobQueue(): if job.IsDone() || job.IsRollbackDone() { binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query) if !job.IsRollbackDone() { job.State = model.JobStateSynced } err = w.finishDDLJob(t, job) return errors.Trace(err) } type Binlog struct { DdlQuery []byte DdlJobId int64 }
DdlQuery 會(huì)設(shè)置為原始的 DDL 語句,DdlJobId 會(huì)設(shè)置為 DDL 的任務(wù) ID。
對(duì)于最后一次 Job 狀態(tài)的提交,會(huì)有兩條 Binlog 與之對(duì)應(yīng),這里有幾種情況:
如果事務(wù)提交成功,類型分別為 binlog.BinlogType_Prewrite 和 binlog.BinlogType_Commit。
如果事務(wù)提交失敗,類型分別為 binlog.BinlogType_Prewrite 和 binlog.BinlogType_Rollback。
所以,Pumps 收到的 DDL Binlog,如果類型為 binlog.BinlogType_Rollback 應(yīng)該只認(rèn)為如下狀態(tài)是合法的:
JobStateDone (因?yàn)樾薷臑?JobStateSynced 還未成功)
JobStateRollbackDone
如果類型為 binlog.BinlogType_Commit,應(yīng)該只認(rèn)為如下狀態(tài)是合法的:
JobStateSynced
JobStateRollbackDone
當(dāng) TiDB 在提交最后一個(gè) Job 狀態(tài)的時(shí)候,如果事務(wù)提交失敗了,那么 TiDB Owner 會(huì)嘗試?yán)^續(xù)修改這個(gè) Job,直到成功。也就是對(duì)于同一個(gè) DdlJobId,后續(xù)還可能會(huì)有多次 Binlog,直到出現(xiàn) binlog.BinlogType_Commit。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/17884.html
摘要:總體而言,讀者需要有一定的使用經(jīng)驗(yàn),以及可以讀懂語言程序。內(nèi)容概要本篇作為源碼閱讀系列文章的序篇,會(huì)簡(jiǎn)單的給大家講一下后續(xù)會(huì)講哪些部分以及邏輯順序,方便大家對(duì)本系列文章有整體的了解。小結(jié)本篇文章主要介紹了源碼閱讀系列文章的目的和規(guī)劃。 作者:黃佳豪 TiDB Binlog 組件用于收集 TiDB 的 binlog,并準(zhǔn)實(shí)時(shí)同步給下游,如 TiDB、MySQL 等。該組件在功能上類似于 ...
閱讀 3209·2023-04-26 02:27
閱讀 2138·2021-11-22 14:44
閱讀 4082·2021-10-22 09:54
閱讀 3194·2021-10-14 09:43
閱讀 747·2021-09-23 11:53
閱讀 12674·2021-09-22 15:33
閱讀 2703·2019-08-30 15:54
閱讀 2681·2019-08-30 14:04