文章前言
随着区块链技术的发展和应用的普及,以太坊成为最受欢迎和广泛应用的智能合约平台之一,在以太坊网络中区块数据的同步是保证网络的安全性和一致性的关键步骤,了解以太坊区块数据的同步方式对于理解以太坊网络的工作原理以及参与其中的开发和应用具有重要意义,本文将简易介绍以太坊区块数据同步方式的基本概念和原理,我们将探讨以太坊网络中的节点角色,包括全节点、轻节点和验证者节点并解释它们在区块数据同步中的作用,我们还将介绍以太坊区块链中的区块和交易的结构并说明在同步过程中如何验证区块数据的有效性
同步方式
以太坊区块数据的同步方式主要包括全节点同步、轻节点同步和验证者节点同步:
全节点同步
全节点是以太坊网络中最完整的节点,它存储并维护了完整的区块链数据,全节点通过从其他全节点请求区块数据来进行同步,全节点同步的过程称为全节点同步算法,也被称为"以太坊客户端"
全节点同步的步骤如下:
- 初始化:全节点通过从创世区块开始,逐个请求区块数据来初始化自己的区块链数据库。
- 区块请求:全节点向其他全节点发送区块请求,获取缺失的区块数据。
- 区块验证:全节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果。
- 区块应用:全节点将验证通过的区块数据应用到自己的区块链数据库中,更新状态和交易历史。
全节点同步方式的优点是可以获得最完整和可信的区块链数据,但它需要大量的存储空间和网络带宽并且同步过程相对较慢
轻节点同步
轻节点是以太坊网络中的一种轻量级节点,它只存储一小部分区块链数据,轻节点通过与特定的全节点进行通信来进行同步,而无需存储整个区块链数据
轻节点同步的步骤如下:
- 初始化:轻节点通过连接到一个或多个全节点,获取初始区块数据
- 区块请求:轻节点向全节点发送区块请求,获取缺失的区块数据
- 区块验证:轻节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果
- 状态获取:轻节点通过与全节点的交互,获取执行合约所需的状态数据
- 交易验证:轻节点验证交易的有效性,但不执行合约操作
轻节点同步方式的优点是占用更少的存储空间和网络带宽,同步速度相对较快,但它无法直接验证交易结果,需要依赖全节点来提供状态数据
验证者节点同步:
验证者节点是以太坊2.0网络中的一种节点,主要参与共识和验证区块数据的过程,验证者节点同步与全节点同步略有不同,因为以太坊2.0使用了Proof of Stake(PoS)共识算法
验证者节点同步的步骤如下:
- 初始化:验证者节点通过连接到网络上的其他验证者节点,获取初始区块数据和验证者集合
- 区块请求:验证者节点向其他验证者节点请求缺失的区块数据
- 区块验证:验证者节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果
- 状态获取:验证者节点通过与其他验证者节点的交互,获取执行合约所需的状态数据
- 交易验证:验证者节点验证交易的有效性,但不执行合约操作
- 共识参与:验证者节点参与共识过程,例如通过质押和投票来决定下一个区块的验证者
节点角色
在以太坊网络中,有几种不同的节点角色,每个角色都扮演着不同的功能和责任,下面是对以太坊网络中常见的节点角色的详细介绍
全节点(Full Node)
全节点是以太坊网络中最完整的节点,它存储并维护了整个区块链的完整副本。全节点具有以下主要功能:
- 参与共识过程,包括挖矿和验证区块
- 验证交易的有效性和执行智能合约操作
- 通过与其他全节点进行通信,传播交易和区块数据
- 存储所有的区块链数据,包括区块头、交易和智能合约代码
全节点在以太坊网络中起着重要的作用,它们提供了高度的安全性和可靠性,但需要占用大量的存储空间和网络带宽
轻节点(Light Node)
轻节点是以太坊网络中的一种轻量级节点,它只存储了一小部分区块链数据,主要是为了实现轻量级的客户端应用,轻节点具有以下主要功能:
- 存储少量的区块链数据,如区块头和少数的交易数据
- 通过与全节点进行通信,获取缺失的区块数据和状态数据
- 验证交易的有效性,但不执行智能合约操作
- 支持客户端应用程序的开发和使用
轻节点占用较少的存储空间和网络带宽,同步速度相对较快,适合移动设备和资源受限的环境
验证者节点(Validator Node)
验证者节点是以太坊2.0网络中的一种节点角色,它们参与以太坊区块链的共识和验证过程,验证者节点具有以下主要功能
- 存储并验证区块链的一部分数据,包括区块头和少量的交易数据
- 参与共识过程,通过质押资金和投票来决定下一个区块的验证者
- 验证交易的有效性,但不执行智能合约操作
- 通过与其他验证者节点进行通信,传播区块和共识相关的信息
验证者节点在以太坊2.0中引入了Proof of Stake(PoS)共识算法,取代了以太坊1.0中的Proof of Work(PoW)算法并提供了更高的可扩展性和能源效率
源码分析
数据结构
downloader数据结构如下所示:
下面的代码中定义了一个名为Downloader的结构体,它主要用于处理区块链同步过程中的下载任务,下面是结构体中的关键字段解释如下:
- checkpoint:要强制执行头部的检查点块号
- genesis:限制同步的创世块号
- queue:用于选择要下载的哈希值的调度器
- peers:包含活跃的对等节点的集合,从中进行下载
- stateDB:用于存储同步状态的数据库
- stateBloom:用于快速Trie节点和合约代码存在性检查的布隆过滤器
- syncStatsChainOrigin和syncStatsChainHeight:同步开始时的原始块号和已知的最高块号
- syncStatsState:同步状态统计信息
- lightchain:轻客户端链的实例
- blockchain:区块链的实例
- dropPeer:丢弃节点的函数
- synchroniseMock:用于测试时替代synchronise的函数
- synchronising、notified、committed:用于跟踪同步状态的标志
- ancientLimit:可以视为古代数据的最大块号
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go L96
type Downloader struct {
// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
// guaranteed to be so aligned, so take advantage of that. For more information,
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events
checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
// Statistics 统计信息,
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
lightchain LightChain
blockchain BlockChain
// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
committed int32
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels
headerCh chan dataPack // Channel receiving inbound block headers header的输入通道,从网络下载的header会被送到这个通道
bodyCh chan dataPack // Channel receiving inbound block bodies bodies的输入通道,从网络下载的bodies会被送到这个通道
receiptCh chan dataPack // Channel receiving inbound receipts receipts的输入通道,从网络下载的receipts会被送到这个通道
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks 用来传输body fetcher新任务的通道
receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks 用来传输receipt fetcher 新任务的通道
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks 通道为header处理者提供新的任务
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
snapSync bool // Whether to run state sync over the snap protocol
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync //启动新的state fetcher
trackStateReq chan *stateReq
stateCh chan dataPack // Channel receiving inbound node state data State的输入通道,从网络下载的State会被送到这个通道
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.
quitCh chan struct{} // Quit channel to signal termination
quitLock sync.Mutex // Lock to prevent double closes
// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
构造方法
下面的New函数主要用于创建一个新的Downloader实例,该实例用于从远程节点获取哈希和区块,函数的参数包括:
- checkpoint:检查点块号,用于强制执行头部检查点
- stateDb:用于同步状态的数据库
- stateBloom:用于快速Trie节点和合约代码存在性检查的布隆过滤器
- mux:事件多路复用器,用于发布同步操作事件
- chain:区块链实例
- lightchain:轻客户端链实例
- dropPeer:丢弃节点的函数
函数首先进行一些初始化工作,包括设置默认值、创建通道和实例化Downloader结构体,随后函数启动两个并发的goroutine:
- qosTuner():用于计算rttEstimate和rttConfidence,即往返时间估计值和置信度
- stateFetcher():用于启动状态同步任务的监听
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: rawdb.ReadFastTrieProgress(stateDb),
},
trackStateReq: make(chan *stateReq),
}
go dl.qosTuner() //计算rttEstimate和rttConfidence
go dl.stateFetcher() //启动stateFetcher的任务监听
return dl
}
同步下载
区块同步始于Synchronise函数,在这里会直接调用synchronise进行同步,如果同步过程中出现错误,则删除掉Peer:
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
switch err {
case nil, errBusy, errCanceled:
return err
}
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
return err
}
log.Warn("Synchronisation failed, retrying", "err", err)
return err
}
synchronise函数实现代码如下:
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
}
// Make sure only one goroutine is ever allowed past this point at once // 只能运行一个, 检查是否正在运行
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
// Post a user notification of the sync (only once per session) // 发布同步的用户通知(每个会话仅一次)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it doesn't use memory any more. This is a special case
// when the user attempts to fast sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode. Long term we could drop fast sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
if !d.snapSync {
log.Warn("Enabling snapshot sync prototype")
d.snapSync = true
}
mode = FastSync
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) // 重置queue的状态
d.peers.Reset() // 重置peer的状态
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { // 清空d.bodyWakeCh, d.receiptWakeCh
select {
case <-ch:
default:
}
}
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { //清空d.headerCh, d.bodyCh, d.receiptCh
for empty := false; !empty; {
select {
case <-ch:
default:
empty = true
}
}
}
for empty := false; !empty; { // 清空headerProcCh
select {
case <-d.headerProcCh:
default:
empty = true
}
}
// Create cancel channel for aborting mid-flight and mark the master peer
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
d.cancelPeer = id
d.cancelLock.Unlock()
defer d.Cancel() // No matter what, we can't leave the cancel channel open
// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
return d.syncWithPeer(p, hash, td) // 基于哈希链从指定的peer和head hash开始块同步
}
syncWithPeer函数用于基于给定节点和头部哈希开始块同步,函数首先进行一些准备工作,包括发布StartEvent事件,设置错误处理和完成处理,然后函数检查给定节点的版本是否小于64,如果是则返回错误,随后函数根据当前的同步模式获取模式(FastSync或FullSync),紧接着函数调用fetchHead方法获取同步边界,即公共祖先和目标块,如果是FastSync模式且未返回pivot块,则将pivot块设置为当前块链的头部块,然后函数通过调用findAncestor方法找到共同祖先以便确定同步的起始点,接下来函数更新同步统计信息并根据同步模式进行一些处理,包括更新共同祖先点和写入数据库中的pivot点,接着函数设置一些初始值并根据情况更新ancientLimit,最后函数调用spawnSync方法来启动同步过程并返回结果,代码如下(可以查看下面的具体注释信息哦~)
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go L448
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
latest := d.lightchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
}
}()
if p.version < 64 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
}
mode := d.getMode()
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())
// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
if mode == FastSync && pivot == nil {
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chian). In that case we won't really fast sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()
origin, err := d.findAncestor(p, latest) // 通过findAncestor来获取共同祖先,以便找到一个开始同步的点
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()
// Ensure our origin point is below any fast sync pivot point
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) { // 如果对端节点的height小于64,则共同祖先更新为0
origin = 0
} else { // 否则更新pivot为对端节点height-64
pivotNumber := pivot.Number.Uint64()
if pivotNumber <= origin { // 如果pivot小于共同祖先,则更新共同祖先为pivot的前一个
origin = pivotNumber - 1
}
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
if mode == FastSync && pivot.Number.Uint64() != 0 {
d.committed = 0
}
if mode == FastSync {
// Set the ancient data limitation.
// If we are running fast sync, all block data older than ancientLimit will be
// written to the ancient store. More recent data will be written to the active
// database and will wait for the freezer to migrate.
//
// If there is a checkpoint available, then calculate the ancientLimit through
// that. Otherwise calculate the ancient limit through the advertised height
// of the remote peer.
//
// The reason for picking checkpoint first is that a malicious peer can give us
// a fake (very high) height, forcing the ancient limit to also be very high.
// The peer would start to feed us valid blocks until head, resulting in all of
// the blocks might be written into the ancient store. A following mini-reorg
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly.
if origin >= frozen && frozen != 0 {
d.ancientLimit = 0
log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
} else if d.ancientLimit > 0 {
log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
}
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin + 1); err != nil {
return err
}
}
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode) // 更新queue的值从共同祖先+1开始,即从共同祖先开始sync区块
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == FastSync { //根据模式的不同,增加新的处理逻辑
d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()
fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
return d.spawnSync(fetchers)
}
spawnSync会给每个fetcher启动一个goroutine, 然后阻塞的等待fetcher出错:
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers []func() error) error {
errc := make(chan error, len(fetchers))
d.cancelWg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers); i++ {
if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil && err != errCanceled {
break
}
}
d.queue.Close()
d.Cancel()
return err
}
同步State
state即世界状态,其保存着所有账户的余额等信息
// filedir: go-ethereum-1.10.2\eth\downloader\statesync.go
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
}
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
return
}
}
}
runStateSync函数执行状态同步,直到它完成或请求切换到另一个根哈希,函数首先初始化了一些变量,包括active(当前正在进行中的请求)、finished(已完成或失败的请求)、timeout(超时的活动请求)等,紧接着函数启动一个goroutine来运行状态同步并在函数结束时取消同步,函数还订阅了peerDrop通道,用于接收对等节点断开连接的事件以便取消已分配的任务,最后函数进入一个无限循环,在循环中通过select语句监听多个通道的事件:
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
log.Trace("State sync starting", "root", s.root)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}()
go s.run()
defer s.Cancel()
// Listen for peer departure events to cancel assigned tasks
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()
for {
// Enable sending of the first buffered element if there is one.
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
)
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
}
select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next
case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil
// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq:
// Shift out the first request, but also set the emptied slot to nil for GC
copy(finished, finished[1:])
finished[len(finished)-1] = nil
finished = finished[:len(finished)-1]
// Handle incoming state packs:
case pack := <-d.stateCh:
// Discard any data not requested (or previously timed out)
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.response = pack.(*statePack).states
req.delivered = time.Now()
finished = append(finished, req)
delete(active, pack.PeerId())
// Handle dropped peer connections:
case p := <-peerDrop:
// Skip if no request is currently pending
req := active[p.id]
if req == nil {
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.dropped = true
req.delivered = time.Now()
finished = append(finished, req)
delete(active, p.id)
// Handle timed-out requests:
case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout.
// This can happen when the timeout and the delivery happens simultaneously,
// causing both pathways to trigger.
if active[req.peer.id] != req {
continue
}
req.delivered = time.Now()
// Move the timed out data back into the download queue
finished = append(finished, req)
delete(active, req.peer.id)
// Track outgoing state requests:
case req := <-d.trackStateReq:
// If an active request already exists for this peer, we have a problem. In
// theory the trie node schedule must never assign two requests to the same
// peer. In practice however, a peer might receive a request, disconnect and
// immediately reconnect before the previous times out. In this case the first
// request is never honored, alas we must not silently overwrite it, as that
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Move the previous request to the finished set
old.timer.Stop()
old.dropped = true
old.delivered = time.Now()
finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
req.timer = time.AfterFunc(req.timeout, func() {
timeout <- req
})
active[req.peer.id] = req
}
}
}
未分析完,下篇再续~