摘要:作者比原項目倉庫地址地址在前一篇中,我們說到,當比原向其它節點請求區塊數據時,會發送一個把需要的區塊告訴對方,并把該信息對應的二進制數據放入對應的通道中,等待發送。這個就是真正與連接對象綁定的一個緩存區,寫入到它里面的數據,會被發送出去。
作者:freewind
比原項目倉庫:
Github地址:https://github.com/Bytom/bytom
Gitee地址:https://gitee.com/BytomBlockc...
在前一篇中,我們說到,當比原向其它節點請求區塊數據時,BlockKeeper會發送一個BlockRequestMessage把需要的區塊height告訴對方,并把該信息對應的二進制數據放入ProtocolReactor對應的sendQueue通道中,等待發送。而具體的發送細節,由于邏輯比較復雜,所以在前一篇中并未詳解,放到本篇中。
由于sendQueue是一個通道,數據放進去后,到底是由誰在什么情況下取走并發送,BlockKeeper這邊是不知道的。經過我們在代碼中搜索,發現只有一個類型會直接監視sendQueue中的數據,它就是前文出現的MConnection。MConnection的對象在它的OnStart方法中,會監視sendQueue中的數據,然后,等發現數據時,會將之取走并放入一個叫sending的通道里。
事情變得有點復雜了:
由前篇我們知道,一個MConnection對應了一個與peer的連接,而比原節點之間建立連接的情況又有多種:比如主動連接別的節點,或者別的節點主動連上我
放入通道sending之后,我們還需要知道又是誰在什么情況下會監視sending,取走它里面的數據
sending中的數據被取走后,又是如何被發送到其它節點的呢?
還是像以前一樣,遇到復雜的問題,我們先通過“相互獨立,完全窮盡”的原則,把它分解成一個個小問題,然后依次解決。
那么首先我們需要弄清楚的是:
比原在什么情況下,會創建MConnection的對象并調用其OnStart方法?(從而我們知道sendQueue中的數據是如何被監視的)
經過分析,我們發現MConnection的啟動,只出現在一個地方,即Peer的OnStart方法中。那么就這個問題就變成了:比原在什么情況下,會創建Peer的對象并調用其OnStart方法?
再經過一番折騰,終于確定,在比原中,在下列4種情況Peer.OnStart方法最終會被調用:
比原節點啟動后,主動去連接配置文件指定的種子節點、以及本地數據目錄中addrbook.json中保存的節點的時候
比原監聽本地p2p端口后,有別的節點連上來的時候
啟動PEXReactor,并使用它自己的協議與當前連接上的節點進行通信的時候
在一個沒有用上的Switch.Connect2Switches方法中(可忽略)
第4種情況我們完全忽略。第3種情況中,由于PEXReactor會使用類似于BitTorrent的文件分享協議與其它節點分享數據,邏輯比較獨立,算是一種輔助作用,我們也暫不考慮。這樣我們就只需要分析前兩種情況了。
比原節點啟動時,是如何主動連接其它節點,并最終調用了MConnection.OnStart方法的?首先我們快速走到SyncManager.Start方法:
cmd/bytomd/main.go#L54
func main() { cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir())) cmd.Execute() }
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error { n := node.NewNode(config) if _, err := n.Start(); err != nil { // ... }
node/node.go#L169
func (n *Node) OnStart() error { // ... n.syncManager.Start() // ... }
netsync/handle.go#L141
func (sm *SyncManager) Start() { go sm.netStart() // ... }
然后我們將進入netStart()方法。在這個方法中,比原將主動連接其它節點:
func (sm *SyncManager) netStart() error { // ... if sm.config.P2P.Seeds != "" { // dial out seeds := strings.Split(sm.config.P2P.Seeds, ",") if err := sm.DialSeeds(seeds); err != nil { return err } } return nil }
這里出現的sm.config.P2P.Seeds,對應的就是本地數據目錄中config.toml中的p2p.seeds中的種子結點。
接著通過sm.DialSeeds去主動連接每個種子:
netsync/handle.go#L229-L231
func (sm *SyncManager) DialSeeds(seeds []string) error { return sm.sw.DialSeeds(sm.addrBook, seeds) }
p2p/switch.go#L311-L340
func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { // ... for i := 0; i < len(perm)/2; i++ { j := perm[i] sw.dialSeed(netAddrs[j]) } // ... }
p2p/switch.go#L342-L349
func (sw *Switch) dialSeed(addr *NetAddress) { peer, err := sw.DialPeerWithAddress(addr, false) // ... }
p2p/switch.go#L351-L392
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { // ... peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) // ... err = sw.AddPeer(peer) // ... }
先是通過newOutboundPeerWithConfig創建了peer,然后把它加入到sw(即Switch對象)中。
p2p/switch.go#L226-L275
func (sw *Switch) AddPeer(peer *Peer) error { // ... // Start peer if sw.IsRunning() { if err := sw.startInitPeer(peer); err != nil { return err } } // ... }
在sw.startInitPeer中,將會調用peer.Start:
p2p/switch.go#L300-L308
func (sw *Switch) startInitPeer(peer *Peer) error { peer.Start() // ... }
而peer.Start對應了Peer.OnStart,最后就是:
p2p/peer.go#L207-L211
func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err }
可以看到,在這里調用了mconn.Start,終于找到了。總結一下就是:
Node.Start -> SyncManager.Start -> SyncManager.netStart -> Switch.DialSeeds -> Switch.AddPeer -> Switch.startInitPeer -> Peer.OnStart -> MConnection.OnStart
那么,第一種主動連接別的節點的情況就到這里分析完了。下面是第二種情況:
當別的節點連接到本節點時,比原是如何走到MConnection.OnStart方法這一步的?比原節點啟動后,會監聽本地的p2p端口,等待別的節點連接上來。那么這個流程又是什么樣的呢?
由于比原節點的啟動流程在目前的文章中已經多次出現,這里就不貼了,我們直接從Switch.OnStart開始(它是在SyncManager啟動的時候啟動的):
p2p/switch.go#L186-L185
func (sw *Switch) OnStart() error { // ... for _, peer := range sw.peers.List() { sw.startInitPeer(peer) } // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } // ... }
這個方法經過省略以后,還剩兩塊代碼,一塊是startInitPeer(...),一塊是sw.listenerRoutine(listener)。
如果你剛才在讀前一節時留意了,就會發現,startInitPeer(...)方法馬上就會調用Peer.Start。然而在這里需要說明的是,經過我的分析,發現這塊代碼實際上沒有起到任何作用,因為在當前這個時刻,sw.peers總是空的,它里面還沒有來得及被其它的代碼添加進peer。所以我覺得它可以刪掉,以免誤導讀者。(提了一個issue,參見#902)
第二塊代碼,listenerRoutine,如果你還有印象的話,它就是用來監聽本地p2p端口的,在前面“比原是如何監聽p2p端口的”一文中有詳細的講解。
我們今天還是需要再挖掘一下它,看看它到底是怎么走到MConnection.OnStart的:
p2p/switch.go#L498-L536
func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() // ... err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) // ... } }
這里的l就是監聽本地p2p端口的Listener。通過一個for循環,拿到連接到該端口的節點的連接,生成新peer。
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { // ... peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) // ... if err = sw.AddPeer(peer); err != nil { // ... } // ... }
生成新的peer之后,調用了Switch的AddPeer方法。到了這里,就跟前一節一樣了,在AddPeer中將調用sw.startInitPeer(peer),然后調用peer.Start(),最后調用了MConnection.OnStart()。由于代碼一模一樣,就不貼出來了。
總結一下,就是:
Node.Start -> SyncManager.Start -> SyncManager.netStart -> Switch.OnStart -> Switch.listenerRoutine -> Switch.addPeerWithConnectionAndConfig -> Switch.AddPeer -> Switch.startInitPeer -> Peer.OnStart -> MConnection.OnStart
那么,第二種情況我們也分析完了。
不過到目前為止,我們只解決了這次問題中的第一個小問題,即:我們終于知道了比原代碼會在什么情況來啟動一個MConnection,從而監視sendQueue通道,把要發送的信息數據,轉到了sending通道中。
那么,我們進入下一個小問題:
數據放入通道sending之后,誰又會來取走它們呢?經過分析之后,發現通道sendQueue和sending都屬于類型Channel,只不過兩者作用不同。sendQueue是用來存放待發送的完整的信息數據,而sending更底層一些,它持有的數據可能會被分成多個塊發送。如果只有sendQueue一個通道,那么很難實現分塊的操作的。
而Channel的發送是由MConnection來調用的,幸運的是,當我們一直往回追溯下去,發現竟走到了MConnection.OnStart這里。也就是說,我們在這個小問題中,研究的正好是前面兩個鏈條后面的部分:
Node.Start -> SyncManager.Start -> SyncManager.netStart -> Switch.DialSeeds -> Switch.AddPeer -> Switch.startInitPeer -> Peer.OnStart -> MConnection.OnStart -> ???
Node.Start -> SyncManager.Start -> SyncManager.netStart -> Switch.OnStart -> Switch.listenerRoutine -> Switch.addPeerWithConnectionAndConfig -> Switch.AddPeer -> Switch.startInitPeer -> Peer.OnStart -> MConnection.OnStart -> ???
也就是上面的???部分。
那么我們就直接從MConnection.OnStart開始:
p2p/connection.go#L152-L159
func (c *MConnection) OnStart() error { // ... go c.sendRoutine() // ... }
c.sendRoutine()方法就是我們需要的。當MConnection啟動以后,就會開始進行發送操作(等待數據到來)。它的代碼如下:
p2p/connection.go#L289-L343
func (c *MConnection) sendRoutine() { // ... case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: default: } } } // ... }
這個方法本來很長,只是我們省略掉了很多無關的代碼。里面的c.sendSomeMsgPackets()就是我們要找的,但是,我們突然發現,怎么又出來了一個c.send通道?它又有什么用?而且看起來好像只有當這個通道里有東西的時候,我們才會去調用c.sendSomeMsgPackets(),似乎像是一個鈴鐺一樣用來提醒我們。
那么c.send什么時候會有東西呢?檢查了代碼之后,發現在以下3個地方:
p2p/connection.go#L206-L239
func (c *MConnection) Send(chID byte, msg interface{}) bool { // ... success := channel.sendBytes(wire.BinaryBytes(msg)) if success { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: // .. }
p2p/connection.go#L243-L271
func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // ... ok = channel.trySendBytes(wire.BinaryBytes(msg)) if ok { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: // ... }
p2p/connection.go#L289-L343
func (c *MConnection) sendRoutine() { // .... case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: // ... }
如果我們對前一篇文章還有印象,就會記得channel.trySendBytes是在我們想給對方節點發信息時調用的,調用完以后,它會把信息對應的二進制數據放入到channel.sendQueue通道(所以才有了本文)。channel.sendBytes我們目前雖然還沒用到,但是它也應該是類似的。在它們兩個調用完之后,它們都會向c.send通道里放入一個數據,用來通知Channel有數據可以發送了。
而第三個sendRoutine()就是我們剛剛走到的地方。當我們調用c.sendSomeMsgPackets()發送了sending中的一部分之后,如果還有剩余的,則繼續向c.send放個數據,提醒可以繼續發送。
那到目前為止,發送數據涉及到的Channel就有三個了,分別是sendQueue、sending和send。之所以這么復雜,根本原因就是想把數據分塊發送。
為什么要分塊發送呢?這是因為比原希望能控制發送速率,讓節點之間的網速能保持在一個合理的水平。如果不限制的話,一下子發出大量的數據,一是可能會讓接收者來不及處理,二是有可能會被惡意節點利用,請求大量區塊數據把帶寬占滿。
擔心sendQueue、sending和send這三個通道不太好理解,我想到了一個“燒鴨店”的比喻,來理解它們:
sendQueue就像是用來掛烤好的燒鴨的勾子,可以有多個(但對于比原來說,默認只有一個,因為sendQueue的容量默認為1),當有燒鴨烤好以后,就掛在勾子上;
sending是砧板,可以把燒鴨從sendQueue勾子上取下來一只,放在上面切成塊,等待裝盤,一只燒鴨可能可以裝成好幾盤;
而send是鈴鐺,當有人點單后,服務員就會按一下鈴鐺,廚師就從sending砧板上拿幾塊燒鴨放在小盤中放在出餐口。由于廚師非常忙,每次切出一盤后都可能會去做別的事情,而忘了sending砧板上還有燒鴨沒裝盤,所以為了防止自己忘記,他每切出一盤之后,都會看一眼sending砧板,如果還有肉,就會按一下鈴鐺提醒自己繼續裝盤。
好了,理解了send后,我們就可以回到主線,繼續看c.sendSomeMsgPackets()的代碼了:
p2p/connection.go#L347-L360
func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we"re ready we send more than we asked for, // but amortized it should even out. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { if c.sendMsgPacket() { return true } } return false }
c.sendMonitor.Limit的作用是限制發送速率,其中maxMsgPacketTotalSize即每個packet的最大長度為常量10240,第二個參數是預先指定的發送速率,默認值為500KB/s,第三個參數是說,當實際速度過大時,是否暫停發送,直到變得正常。
經過限速的調整后,后面一段就可以正常發送數據了,其中的c.sendMsgPacket是我們繼續要看的方法:
p2p/connection.go#L363-L398
func (c *MConnection) sendMsgPacket() bool { // ... n, err := leastChannel.writeMsgPacketTo(c.bufWriter) // .. c.sendMonitor.Update(int(n)) // ... return false }
這個方法最前面我省略了一大段代碼,其作用是檢查多個channel,結合它們的優先級和已經發的數據量,找到當前最需要發送數據的那個channel,記為leastChannel。
然后就是調用leastChannel.writeMsgPacketTo(c.bufWriter),把當前要發送的一塊數據,寫到bufWriter中。這個bufWriter就是真正與連接對象綁定的一個緩存區,寫入到它里面的數據,會被Go發送出去。它的定義是在創建MConnection的地方:
p2p/connection.go#L114-L118
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
其中minReadBufferSize為1024,minWriteBufferSize為65536。
數據寫到bufWriter以后,我們就不需要關心了,交給Go來操作了。
在leastChannel.writeMsgPacketTo(c.bufWriter)調用完以后,后面會更新c.sendMonitor,這樣它才能繼續正確的限速。
這時我們已經知道數據是怎么發出去的了,但是我們還沒有找到是誰在監視sending里的數據,那讓我們繼續看leastChannel.writeMsgPacketTo:
p2p/connection.go#L655-L663
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteBinary(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) } return }
其中的ch.nextMsgPacket()是取出下一個要發送的數據塊,那么是從哪里取出呢?是從sending嗎?
其后的代碼是把數據塊對象變成二進制,放入到前面的bufWriter中發送。
繼續ch.nextMsgPacket():
p2p/connection.go#L638-L651
func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet }
終于看到sending了。從這里可以看出,sending的確是放著很多塊鴨肉的砧板,而packet就是一個小盤,所以需要從先sending中拿出不超過指定長度的數據放到packet中,然后判斷sending里還有沒有剩下的。如果有,則packet的EOF值為0x00,否則為0x01,這樣調用者就知道數據有沒有發完,還需不需要去按那個叫send的鈴。
那么到這里為止,我們就知道原來還是Channel自己在關注sending,并且為了限制發送速度,需要把它切成一個個小塊。
最后就我們的第三個小問題了,其實我們剛才在第二問里已經弄清楚了。
sending中的數據被取走后,又是如何被發送到其它節點的呢?答案就是,sending中的數據被分成一塊塊取出來后,會放入到bufWriter中,就直接被Go的net.Conn對象發送出去了。到這一層面,就不需要我們再繼續深入了。
總結由于本篇中涉及的方法調用比較多,可能看完都亂了,所以在最后,我們前面調用鏈補充完整,放在最后:
Node.Start -> SyncManager.Start -> SyncManager.netStart -> Switch.DialSeeds -> Switch.AddPeer -> Switch.startInitPeer -> Peer.OnStart -> MConnection.OnStart -> ...
Node.Start -> SyncManager.Start -> SyncManager.netStart -> Switch.OnStart -> Switch.listenerRoutine -> Switch.addPeerWithConnectionAndConfig -> Switch.AddPeer -> Switch.startInitPeer -> Peer.OnStart -> MConnection.OnStart -> ...
然后是:
MConnection.sendRoutine -> MConnection.send -> MConnection.sendSomeMsgPackets -> MConnection.sendMsgPacket -> MConnection.writeMsgPacketTo -> MConnection.nextMsgPacket -> MConnection.sending
到了最后,我的感覺就是,一個復雜問題最開始看起來很可怕,但是一旦把它分解成小問題之后,每次只關注一個,各個擊破,好像就沒那么復雜了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/24156.html
摘要:到這里,我們總算能夠完整的理解清楚,當我們向一個比原節點請求區塊數據,我們這邊需要怎么做,對方節點又需要怎么做了。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 在上一篇,我們知道了比原是如何把請求區塊數據的信息BlockReque...
摘要:作者比原項目倉庫地址地址在上上篇文章里,我們還剩下一個小問題沒有解決,即前端是如何顯示一個交易的詳細信息的。那我們在本文看一下,比原是如何顯示這個交易的詳細信息的。到今天為止,我們終于把比原是如何創建一個交易的這件事的基本流程弄清楚了。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https:/...
摘要:如果傳的是,就會在內部使用默認的隨機數生成器生成隨機數并生成密鑰。使用的是,生成的是一個形如這樣的全球唯一的隨機數把密鑰以文件形式保存在硬盤上。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 在前一篇,我們探討了從瀏覽器的dashb...
摘要:繼續看生成地址的方法由于這個方法里傳過來的是而不是對象,所以還需要再用查一遍,然后,再調用這個私有方法創建地址該方法可以分成部分在第塊中主要關注的是返回值。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 在比原的dashboard中...
摘要:作者比原項目倉庫地址地址在前一篇中,我們已經知道如何連上一個比原節點的端口,并與對方完成身份驗證。代碼如下可以看到,首先是從眾多的中,找到最合適的那個。到這里,我們其實已經知道比原是如何向其它節點請求區塊數據,以及何時把信息發送出去。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://...
閱讀 661·2021-11-15 11:39
閱讀 2895·2021-10-08 10:04
閱讀 3259·2019-08-30 10:57
閱讀 3021·2019-08-26 13:25
閱讀 1902·2019-08-26 12:14
閱讀 2633·2019-08-23 15:27
閱讀 2992·2019-08-23 15:18
閱讀 1773·2019-08-23 14:26