- 初始化:全节点通过从创世区块开始,逐个请求区块数据来初始化自己的区块链数据库。
- 区块请求:全节点向其他全节点发送区块请求,获取缺失的区块数据。
- 区块验证:全节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果。
- 区块应用:全节点将验证通过的区块数据应用到自己的区块链数据库中,更新状态和交易历史。
- 初始化:轻节点通过连接到一个或多个全节点,获取初始区块数据
- 区块请求:轻节点向全节点发送区块请求,获取缺失的区块数据
- 区块验证:轻节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果
- 状态获取:轻节点通过与全节点的交互,获取执行合约所需的状态数据
- 交易验证:轻节点验证交易的有效性,但不执行合约操作
验证者节点是以太坊2.0网络中的一种节点,主要参与共识和验证区块数据的过程,验证者节点同步与全节点同步略有不同,因为以太坊2.0使用了Proof of Stake(PoS)共识算法
- 初始化:验证者节点通过连接到网络上的其他验证者节点,获取初始区块数据和验证者集合
- 区块请求:验证者节点向其他验证者节点请求缺失的区块数据
- 区块验证:验证者节点验证收到的区块数据的有效性,包括校验区块的哈希值、交易的签名和执行合约的结果
- 状态获取:验证者节点通过与其他验证者节点的交互,获取执行合约所需的状态数据
- 交易验证:验证者节点验证交易的有效性,但不执行合约操作
- 共识参与:验证者节点参与共识过程,例如通过质押和投票来决定下一个区块的验证者
全节点(Full Node)
- 参与共识过程,包括挖矿和验证区块
- 验证交易的有效性和执行智能合约操作
- 通过与其他全节点进行通信,传播交易和区块数据
- 存储所有的区块链数据,包括区块头、交易和智能合约代码
轻节点(Light Node)
- 存储少量的区块链数据,如区块头和少数的交易数据
- 通过与全节点进行通信,获取缺失的区块数据和状态数据
- 验证交易的有效性,但不执行智能合约操作
- 支持客户端应用程序的开发和使用
验证者节点(Validator Node)
- 存储并验证区块链的一部分数据,包括区块头和少量的交易数据
- 参与共识过程,通过质押资金和投票来决定下一个区块的验证者
- 验证交易的有效性,但不执行智能合约操作
- 通过与其他验证者节点进行通信,传播区块和共识相关的信息
验证者节点在以太坊2.0中引入了Proof of Stake(PoS)共识算法,取代了以太坊1.0中的Proof of Work(PoW)算法并提供了更高的可扩展性和能源效率
- checkpoint:要强制执行头部的检查点块号
- genesis:限制同步的创世块号
- queue:用于选择要下载的哈希值的调度器
- peers:包含活跃的对等节点的集合,从中进行下载
- stateDB:用于存储同步状态的数据库
- stateBloom:用于快速Trie节点和合约代码存在性检查的布隆过滤器
- syncStatsChainOrigin和syncStatsChainHeight:同步开始时的原始块号和已知的最高块号
- syncStatsState:同步状态统计信息
- lightchain:轻客户端链的实例
- blockchain:区块链的实例
- dropPeer:丢弃节点的函数
- synchroniseMock:用于测试时替代synchronise的函数
- synchronising、notified、committed:用于跟踪同步状态的标志
- ancientLimit:可以视为古代数据的最大块号
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go L96
type Downloader struct {
// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
// guaranteed to be so aligned, so take advantage of that. For more information,
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events
checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
// Statistics 统计信息,
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
lightchain LightChain
blockchain BlockChain
// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
committed int32
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels
headerCh chan dataPack // Channel receiving inbound block headers header的输入通道,从网络下载的header会被送到这个通道
bodyCh chan dataPack // Channel receiving inbound block bodies bodies的输入通道,从网络下载的bodies会被送到这个通道
receiptCh chan dataPack // Channel receiving inbound receipts receipts的输入通道,从网络下载的receipts会被送到这个通道
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks 用来传输body fetcher新任务的通道
receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks 用来传输receipt fetcher 新任务的通道
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks 通道为header处理者提供新的任务
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
snapSync bool // Whether to run state sync over the snap protocol
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync //启动新的state fetcher
trackStateReq chan *stateReq
stateCh chan dataPack // Channel receiving inbound node state data State的输入通道,从网络下载的State会被送到这个通道
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.
quitCh chan struct{} // Quit channel to signal termination
quitLock sync.Mutex // Lock to prevent double closes
// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
- checkpoint:检查点块号,用于强制执行头部检查点
- stateDb:用于同步状态的数据库
- stateBloom:用于快速Trie节点和合约代码存在性检查的布隆过滤器
- mux:事件多路复用器,用于发布同步操作事件
- chain:区块链实例
- lightchain:轻客户端链实例
- dropPeer:丢弃节点的函数
- qosTuner():用于计算rttEstimate和rttConfidence,即往返时间估计值和置信度
- stateFetcher():用于启动状态同步任务的监听
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: rawdb.ReadFastTrieProgress(stateDb),
trackStateReq: make(chan *stateReq),
go dl.qosTuner() //计算rttEstimate和rttConfidence
go dl.stateFetcher() //启动stateFetcher的任务监听
return dl
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
switch err {
case nil, errBusy, errCanceled:
return err
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
return err
log.Warn("Synchronisation failed, retrying", "err", err)
return err
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
// Make sure only one goroutine is ever allowed past this point at once // 只能运行一个, 检查是否正在运行
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
return errBusy
defer atomic.StoreInt32(&d.synchronising, 0)
// Post a user notification of the sync (only once per session) // 发布同步的用户通知(每个会话仅一次)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it doesn't use memory any more. This is a special case
// when the user attempts to fast sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode. Long term we could drop fast sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
if !d.snapSync {
log.Warn("Enabling snapshot sync prototype")
d.snapSync = true
mode = FastSync
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) // 重置queue的状态
d.peers.Reset() // 重置peer的状态
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { // 清空d.bodyWakeCh, d.receiptWakeCh
select {
case <-ch:
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { //清空d.headerCh, d.bodyCh, d.receiptCh
for empty := false; !empty; {
select {
case <-ch:
empty = true
for empty := false; !empty; { // 清空headerProcCh
select {
case <-d.headerProcCh:
empty = true
// Create cancel channel for aborting mid-flight and mark the master peer
d.cancelCh = make(chan struct{})
d.cancelPeer = id
defer d.Cancel() // No matter what, we can't leave the cancel channel open
// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
return d.syncWithPeer(p, hash, td) // 基于哈希链从指定的peer和head hash开始块同步
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go L448
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
defer func() {
// reset on error
if err != nil {
} else {
latest := d.lightchain.CurrentHeader()
if p.version < 64 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
mode := d.getMode()
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
if mode == FastSync && pivot == nil {
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chian). In that case we won't really fast sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
height := latest.Number.Uint64()
origin, err := d.findAncestor(p, latest) // 通过findAncestor来获取共同祖先,以便找到一个开始同步的点
if err != nil {
return err
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
d.syncStatsChainHeight = height
// Ensure our origin point is below any fast sync pivot point
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) { // 如果对端节点的height小于64,则共同祖先更新为0
origin = 0
} else { // 否则更新pivot为对端节点height-64
pivotNumber := pivot.Number.Uint64()
if pivotNumber <= origin { // 如果pivot小于共同祖先,则更新共同祖先为pivot的前一个
origin = pivotNumber - 1
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
d.committed = 1
if mode == FastSync && pivot.Number.Uint64() != 0 {
d.committed = 0
if mode == FastSync {
// Set the ancient data limitation.
// If we are running fast sync, all block data older than ancientLimit will be
// written to the ancient store. More recent data will be written to the active
// database and will wait for the freezer to migrate.
// If there is a checkpoint available, then calculate the ancientLimit through
// that. Otherwise calculate the ancient limit through the advertised height
// of the remote peer.
// The reason for picking checkpoint first is that a malicious peer can give us
// a fake (very high) height, forcing the ancient limit to also be very high.
// The peer would start to feed us valid blocks until head, resulting in all of
// the blocks might be written into the ancient store. A following mini-reorg
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly.
if origin >= frozen && frozen != 0 {
d.ancientLimit = 0
log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
} else if d.ancientLimit > 0 {
log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin + 1); err != nil {
return err
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode) // 更新queue的值从共同祖先+1开始,即从共同祖先开始sync区块
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, td) },
if mode == FastSync { //根据模式的不同,增加新的处理逻辑
d.pivotHeader = pivot
fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
return d.spawnSync(fetchers)
spawnSync会给每个fetcher启动一个goroutine, 然后阻塞的等待fetcher出错:
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers []func() error) error {
errc := make(chan error, len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }()
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers); i++ {
if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
if err = <-errc; err != nil && err != errCanceled {
return err
// filedir: go-ethereum-1.10.2\eth\downloader\statesync.go
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
log.Trace("State sync starting", "root", s.root)
defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
go s.run()
defer s.Cancel()
// Listen for peer departure events to cancel assigned tasks
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()
for {
// Enable sending of the first buffered element if there is one.
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next
case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil
// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq:
// Shift out the first request, but also set the emptied slot to nil for GC
copy(finished, finished[1:])
finished[len(finished)-1] = nil
finished = finished[:len(finished)-1]
// Handle incoming state packs:
case pack := <-d.stateCh:
// Discard any data not requested (or previously timed out)
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
// Finalize the request and queue up for processing
req.response = pack.(*statePack).states
req.delivered = time.Now()
finished = append(finished, req)
delete(active, pack.PeerId())
// Handle dropped peer connections:
case p := <-peerDrop:
// Skip if no request is currently pending
req := active[p.id]
if req == nil {
// Finalize the request and queue up for processing
req.dropped = true
req.delivered = time.Now()
finished = append(finished, req)
delete(active, p.id)
// Handle timed-out requests:
case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout.
// This can happen when the timeout and the delivery happens simultaneously,
// causing both pathways to trigger.
if active[req.peer.id] != req {
req.delivered = time.Now()
// Move the timed out data back into the download queue
finished = append(finished, req)
delete(active, req.peer.id)
// Track outgoing state requests:
case req := <-d.trackStateReq:
// If an active request already exists for this peer, we have a problem. In
// theory the trie node schedule must never assign two requests to the same
// peer. In practice however, a peer might receive a request, disconnect and
// immediately reconnect before the previous times out. In this case the first
// request is never honored, alas we must not silently overwrite it, as that
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Move the previous request to the finished set
old.dropped = true
old.delivered = time.Now()
finished = append(finished, old)
// Start a timer to notify the sync loop if the peer stalled.
req.timer = time.AfterFunc(req.timeout, func() {
timeout <- req
active[req.peer.id] = req