文章前言
本篇文章是以太坊区块数据同步方式简易介绍(上)的续篇
源码分析
同步Head
fetchHead方法用于从远程节点获取链的头部头和之前的轴块,函数首先获取下载器的工作模式并请求远程节点的最新头部块,如果工作模式是FastSync则还会请求轴块头,随后函数启动一个goroutine来向远程节点请求头部块并在后台等待响应,同时函数设置了一个超时时间以确保在超时之前能够收到响应,在无限循环中函数通过select语句监听多个通道的事件
// fetchHead retrieves the head header and prior pivot block (if available) from
// a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
p.log.Debug("Retrieving remote chain head")
mode := d.getMode()
// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
fetch := 1
if mode == FastSync {
fetch = 2 // head + pivot headers
}
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
ttl := d.requestTTL()
timeout := time.After(ttl)
for {
select {
case <-d.cancelCh:
return nil, nil, errCanceled
case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer gave us at least one and at most the requested headers
headers := packet.(*headerPack).headers
if len(headers) == 0 || len(headers) > fetch {
return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
}
// The first header needs to be the head, validate against the checkpoint
// and request. If only 1 header was returned, make sure there's no pivot
// or there was not one requested.
head := headers[0]
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
}
if len(headers) == 1 {
if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
return head, nil, nil
}
// At this point we have 2 headers in total and the first is the
// validated head of the chian. Check the pivot number and return,
pivot := headers[1]
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
}
return head, pivot, nil
case <-timeout:
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return nil, nil, errTimeout
case <-d.bodyCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}
处理Head
下面代码中的processHeaders方法用于处理从输入通道接收到的一批头部块并将它们处理和调度到头部链和下载器的队列中,直到流结束或发生错误为止,函数开始时定义了一些变量,包括rollback(用于回滚的不确定头部块的数量)、rollbackErr(回滚时的错误)、mode(下载器的工作模式),同时函数使用defer语句定义了一个延迟函数用于在函数返回前执行一些操作,包括回滚链段和打印警告信息,在无限循环中函数使用select语句监听多个通道的事件:
- 如果接收到d.cancelCh通道的信号,表示下载被取消,函数会返回相应的错误
- 如果接收到d.headerProcCh通道的头部块批次,函数会进行一系列处理操作,包括验证和调度头部块
- 如果头部块批次的长度为0,表示头部块已完全处理,函数会做一些处理,包括通知其他相关方和进行一些检查
- 否则函数会将头部块批次分割成小批次并进行相应的处理和调度
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error
mode = d.getMode()
)
defer func() {
if rollback > 0 {
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
}
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()
// Wait for batches of headers to process
gotHeaders := false
for {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case headers := <-d.headerProcCh:
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
}
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == FastSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
// Disable any rollback and return
rollback = 0
return nil
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
default:
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately
if mode == FastSync || mode == LightSync {
// If we're importing pure headers, verify based on their recentness
var pivot uint64
d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()
frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
rollbackErr = err
// If some headers were inserted, track them as uncertain
if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
}
log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, track all headers within the alloted limits
if mode == FastSync {
head := chunk[len(chunk)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
origin += uint64(limit)
}
// Update the highest block number we know if a higher one is found.
d.syncStatsLock.Lock()
if d.syncStatsChainHeight < origin {
d.syncStatsChainHeight = origin - 1
}
d.syncStatsLock.Unlock()
// Signal the content downloaders of the availablility of new tasks
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
}
}
}
}
}
同步Body
fetchBodies方法用于迭代地下载已调度的区块体,该方法会获取可用的对等节点并为每个对等节点保留一段区块,等待区块体的传输并定期检查超时情况,方法开始时定义了一些函数变量用于执行不同的操作:
- deliver函数用于将下载的区块体交付给队列进行处理
- expire函数用于检查是否有超时的未传输的区块体,并返回需要过期的区块体的映射
- fetch函数用于向对等节点发送获取区块体的请求
- capacity函数用于获取对等节点的区块处理能力
- setIdle函数用于设置对等节点的区块体状态为闲置
函数调用d.fetchParts方法,该方法用于执行实际的区块体下载操作,它接收一系列的参数和函数用于处理不同的操作和事件,包括传递区块体、检查区块体超时、获取区块体、取消区块体请求等
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
log.Debug("Downloading block bodies", "origin", from)
var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
}
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
)
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
log.Debug("Block body download terminated", "err", err)
return err
}
随后调用DeliverBodies函数
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
随后跟进deliver方法内部:
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
dropMeter.Mark(int64(packet.Items()))
}
}()
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select {
case destCh <- packet:
return nil
case <-cancel:
return errNoSyncActive
}
}
随后调用fetchParts方法执行区块数据的获取和处理,它接收多个参数和函数作为输入,用于执行不同的操作和事件,包括传递数据、检查超时、获取数据、取消请求等,在方法的开始部分创建了一个定时器(ticker)用于检测过期的检索任务并在方法结束时关闭定时器,随后通过一个无限循环,使用select语句监听多个通道的事件:
- 如果收到d.cancelCh通道的事件,表示接收到了取消请求,就返回errCanceled错误
- 如果收到deliveryCh通道的事件,表示有数据包传递到达,执行相关处理操作,包括将数据交付给队列处理、检查链的有效性、设置对等节点的状态等
- 如果收到wakeCh通道的事件,表示区块头部的获取操作发送了继续标志,检查是否完成了区块头部的获取
- 如果收到定时器ticker.C的事件,表示定时器触发,执行一些操作,如更新进度
- 如果收到update通道的事件,表示需要更新进度,根据不同的情况执行相应的操作,如检查超时、发送下载请求等
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
update := make(chan struct{}, 1)
// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false
for {
select {
case <-d.cancelCh:
return errCanceled
case packet := <-deliveryCh:
deliveryTime := time.Now()
// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of data and check chain validity
accepted, err := deliver(packet)
if errors.Is(err, errInvalidChain) {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted, deliveryTime)
}
// Issue a log to the user to see what's going on
switch {
case err == nil && packet.Items() == 0:
peer.log.Trace("Requested data not delivered", "type", kind)
case err == nil:
peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
default:
peer.log.Debug("Failed to deliver retrieved data", "type", kind, "err", err)
}
}
// Blocks assembled, try to update the progress
select {
case update <- struct{}{}:
default:
}
case cont := <-wakeCh:
// The header fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
}
// Headers arrive, try to update the progress
select {
case update <- struct{}{}:
default:
}
case <-ticker.C:
// Sanity check update the progress
select {
case update <- struct{}{}:
default:
}
case <-update:
// Short circuit if we lost all our peers
if d.peers.Len() == 0 {
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
// out that sync wise we need to get rid of the peer.
//
// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
peer.log.Trace("Data delivery timed out", "type", kind)
setIdle(peer, 0, time.Now())
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
} else {
d.dropPeer(pid)
// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := pid == d.cancelPeer
d.cancelLock.RUnlock()
if master {
d.cancel()
return errTimeout
}
}
}
}
}
// If there's nothing more to fetch, wait or terminate
if pending() == 0 {
if !inFlight() && finished {
log.Debug("Data fetching completed", "type", kind)
return nil
}
break
}
// Send a download request to all idle peers, until throttled
progressed, throttled, running := false, false, inFlight()
idles, total := idle()
pendCount := pending()
for _, peer := range idles {
// Short circuit if throttling activated
if throttled {
break
}
// Short circuit if there is no more available task.
if pendCount = pending(); pendCount == 0 {
break
}
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
request, progress, throttle := reserve(peer, capacity(peer))
if progress {
progressed = true
}
if throttle {
throttled = true
throttleCounter.Inc(1)
}
if request == nil {
continue
}
if request.From > 0 {
peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
} else {
peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
// means that we've double allocated a fetch task to a peer. If that is the
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
}
running = true
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
return errPeersUnavailable
}
}
}
}
同步收据
下面的这段代码定义了一个名为fetchReceipts的方法,用于逐步下载预定的区块收据(transaction receipts),该方法接收一个起始位置from作为参数,表示从哪个位置开始下载收据
// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
log.Debug("Downloading transaction receipts", "origin", from)
var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
log.Debug("Transaction receipt download terminated", "err", err)
return err
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
dropMeter.Mark(int64(packet.Items()))
}
}()
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select {
case destCh <- packet:
return nil
case <-cancel:
return errNoSyncActive
}
}
文末小结
本文简要介绍了以太坊区块数据的同步方式,以太坊是一个分布式的区块链网络,区块数据的同步是网络中节点之间的重要活动。主要包括两个方面:区块头部的同步和区块数据的获取和处理
区块头部的同步是通过节点之间相互广播区块头部信息来实现的,节点通过交换区块头部信息来了解整个区块链的结构和状态,以便验证和同步区块链
区块数据的获取和处理是指节点通过网络获取区块数据并进行相应的处理操作,节点在获取区块数据时可以通过获取区块头部信息,然后根据区块头部信息请求相应的区块数据,获取到区块数据后,节点会对数据进行处理,包括验证区块数据的有效性、交付给队列处理、检查超时等操作