Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,27 +1540,12 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// existing local chain segments (reorg around the chain tip). The reorganized part
// will be included in the provided chain segment, and stale canonical markers will be
// silently rewritten. Therefore, no explicit reorg logic is needed.
writeLive := func(blockChain types.Blocks, receiptChain []rlp.RawValue) (int, error) {
var (
skipPresenceCheck = false
batch = bc.db.NewBatch()
)
writeLive := func(blockChain types.Blocks, receiptChain []rlp.RawValue) error {
batch := bc.db.NewBatch()
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
if !skipPresenceCheck {
// Ignore if the entire data is already known
if bc.HasBlock(block.Hash(), block.NumberU64()) {
stats.ignored++
continue
} else {
// If block N is not present, neither are the later blocks.
// This should be true, but if we are mistaken, the shortcut
// here will only cause overwriting of some existing data
skipPresenceCheck = true
}
return errInsertionInterrupted
}
// Write all the data out into the database
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
Expand All @@ -1572,7 +1557,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// except transaction indexes(will be created once sync is finished).
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
return err
}
size += int64(batch.ValueSize())
batch.Reset()
Expand All @@ -1585,13 +1570,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if batch.ValueSize() > 0 {
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
return 0, err
return err
}
}
if err := updateHead(blockChain[len(blockChain)-1].Header()); err != nil {
return 0, err
}
return 0, nil
return updateHead(blockChain[len(blockChain)-1].Header())
}

// Split the supplied blocks into two groups, according to the
Expand All @@ -1608,11 +1590,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
}
if index != len(blockChain) {
if n, err := writeLive(blockChain[index:], receiptChain[index:]); err != nil {
if err := writeLive(blockChain[index:], receiptChain[index:]); err != nil {
if err == errInsertionInterrupted {
return 0, nil
}
return n, err
return 0, err
}
}
var (
Expand Down
5 changes: 4 additions & 1 deletion core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
// Retrieve all the components of the canonical block.
hash := ReadCanonicalHash(nfdb, number)
if hash == (common.Hash{}) {
return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
// A missing canonical mapping at the freeze frontier is almost
// always an orphaned block left by an unclean stop (header/body
// present by hash, but no number->hash mapping).
return fmt.Errorf("canonical hash missing, can't freeze block %d (block data present at height: %v)", number, ReadAllHashes(nfdb, number))
}
header := ReadHeaderRLP(nfdb, hash, number)
if len(header) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (f *Freezer) TruncateTail(group string, tail uint64) (uint64, error) {

// SyncAncient flushes all data tables to disk.
func (f *Freezer) SyncAncient() error {
f.writeLock.RLock()
defer f.writeLock.RUnlock()

var errs []error
for _, table := range f.tables {
if err := table.Sync(); err != nil {
Expand Down
17 changes: 12 additions & 5 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,17 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
}
log.Debug("Searching beacon ancestor", "local", number, "beaconhead", beaconHead.Number, "beacontail", beaconTail.Number)

var linked bool
// Require the canonical mapping, not just presence by hash, so orphans and
// side chains are re-delivered instead of left in place.
var (
linked bool
num = beaconTail.Number.Uint64() - 1
)
switch d.getMode() {
case ethconfig.FullSync:
linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
linked = d.blockchain.GetCanonicalHash(num) == beaconTail.ParentHash && d.blockchain.HasBlock(beaconTail.ParentHash, num)
case ethconfig.SnapSync:
linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
linked = d.blockchain.GetCanonicalHash(num) == beaconTail.ParentHash && d.blockchain.HasFastBlock(beaconTail.ParentHash, num)
default:
panic("unknown sync mode")
}
Expand Down Expand Up @@ -226,12 +231,14 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
}
n := h.Number.Uint64()

// Require the canonical mapping, not just presence by hash, so orphans
// and side chains are re-synced instead of treated as already owned.
var known bool
switch d.getMode() {
case ethconfig.FullSync:
known = d.blockchain.HasBlock(h.Hash(), n)
known = d.blockchain.GetCanonicalHash(n) == h.Hash() && d.blockchain.HasBlock(h.Hash(), n)
case ethconfig.SnapSync:
known = d.blockchain.HasFastBlock(h.Hash(), n)
known = d.blockchain.GetCanonicalHash(n) == h.Hash() && d.blockchain.HasFastBlock(h.Hash(), n)
default:
panic("unknown sync mode")
}
Expand Down
4 changes: 4 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ type BlockChain interface {
// HasFastBlock verifies a snap block's presence in the local chain.
HasFastBlock(common.Hash, uint64) bool

// GetCanonicalHash returns the canonical hash for the block at the given
// number, or the zero hash if no canonical block is present at that height.
GetCanonicalHash(uint64) common.Hash

// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash(common.Hash) *types.Block

Expand Down
75 changes: 75 additions & 0 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -40,6 +41,7 @@ import (

// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
db ethdb.Database
chain *core.BlockChain
downloader *Downloader

Expand Down Expand Up @@ -77,6 +79,7 @@ func newTesterWithSnap(t *testing.T, mode ethconfig.SyncMode, success func(), sn
panic(err)
}
tester := &downloadTester{
db: db,
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
Expand Down Expand Up @@ -683,6 +686,78 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
}
}

// TestBeaconSyncRepairFork verifies the end-to-end repair of non-canonical block
// data. The local node sits on fork A, but fork B's blocks below the local head
// are also present by hash (no canonical mapping), as if imported optimistically
// via the engine API. When the beacon chain switches to fork B, sync must not
// anchor on the non-canonical fork-B data; it has to descend to the real common
// ancestor and re-deliver everything, ending with the full fork-B chain present
// and canonical at every height - for both snap and full sync.
func TestBeaconSyncRepairForkFull(t *testing.T) { testBeaconSyncRepairFork(t, eth.ETH69, FullSync) }
func TestBeaconSyncRepairForkSnap(t *testing.T) { testBeaconSyncRepairFork(t, eth.ETH69, SnapSync) }

func testBeaconSyncRepairFork(t *testing.T, protocol uint, mode SyncMode) {
// Reuse the pre-generated fork chains (new chains can't be generated after the
// package init). Fork A and fork B share the whole testChainBase prefix and
// diverge at height len(testChainBase.blocks); fork B (the beacon target) is
// longer, so it wins the reorg. The exact shortenings used here are the ones
// registered as peer chains during init.
localChain := testChainForkLightA.shorten(len(testChainBase.blocks) + 80)
targetChain := testChainForkLightB.shorten(len(testChainBase.blocks) + MaxHeaderFetch)

forkPoint := uint64(len(testChainBase.blocks)) // first height the forks differ
localHead := uint64(len(localChain.blocks) - 1)
targetHead := uint64(len(targetChain.blocks) - 1)

success := make(chan struct{})
tester := newTesterWithNotification(t, mode, func() {
close(success)
})
defer tester.terminate()

tester.newPeer("peer", protocol, targetChain.blocks[1:])

// Make fork A the local canonical chain.
if _, err := tester.chain.InsertChain(localChain.blocks[1 : localHead+1]); err != nil {
t.Fatalf("failed to build local chain: %v", err)
}
// Seed fork B's divergent blocks that sit below the local head as scattered,
// non-canonical data: full block data present by hash, but the canonical
// mapping at those heights still points at fork A.
for n := forkPoint; n <= localHead; n++ {
b := targetChain.blocks[n]
rawdb.WriteBlock(tester.db, b)
rawdb.WriteReceipts(tester.db, b.Hash(), b.NumberU64(), types.Receipts{})
}

if err := tester.downloader.BeaconSync(targetChain.blocks[targetHead].Header(), nil); err != nil {
t.Fatalf("failed to beacon-sync chain: %v", err)
}
select {
case <-success:
case <-time.NewTimer(10 * time.Second).C:
t.Fatalf("failed to sync chain in ten seconds")
}
// The head must reach fork B's tip.
if got := tester.chain.CurrentBlock().Number.Uint64(); got != targetHead {
t.Fatalf("synced head mismatch: have %d, want %d", got, targetHead)
}
// Every height must be canonical to fork B and carry complete block data,
// proving the non-canonical fork-A / seed data was fully reorged out.
for n := uint64(1); n <= targetHead; n++ {
want := targetChain.blocks[n].Hash()
if got := rawdb.ReadCanonicalHash(tester.db, n); got != want {
t.Fatalf("canonical hash at %d: have %x, want %x", n, got, want)
}
if !rawdb.HasHeader(tester.db, want, n) || !rawdb.HasBody(tester.db, want, n) {
t.Fatalf("incomplete block data at %d after sync", n)
}
if !rawdb.HasReceipts(tester.db, want, n) {
t.Fatalf("missing receipts at %d after sync", n)
}
}
}

// Tests that synchronisation progress (origin block number, current block number
// and highest block number) is tracked and updated correctly.
func TestSyncProgressFull(t *testing.T) { testSyncProgress(t, eth.ETH69, FullSync) }
Expand Down
8 changes: 7 additions & 1 deletion eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,13 @@ func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) err
// linked returns the flag indicating whether the skeleton has been linked with
// the local chain.
func (s *skeleton) linked(number uint64, hash common.Hash) bool {
linked := rawdb.HasHeader(s.db, hash, number) &&
// Require the canonical mapping, not just presence by hash. A block present
// only by hash (side chain or orphan from an unclean shutdown) must not be
// used as the link-up point, otherwise it's left in place forever without its
// canonical mapping ever being rewritten. Keep descending to a real canonical
// block.
linked := rawdb.ReadCanonicalHash(s.db, number) == hash &&
rawdb.HasHeader(s.db, hash, number) &&
rawdb.HasBody(s.db, hash, number) &&
rawdb.HasReceipts(s.db, hash, number)

Expand Down
67 changes: 67 additions & 0 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {

rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0]))
rawdb.WriteReceipts(db, chain[0].Hash(), chain[0].Number.Uint64(), types.Receipts{})
rawdb.WriteCanonicalHash(db, chain[0].Hash(), chain[0].Number.Uint64())

// Create a peer set to feed headers through
peerset := newPeerSet()
Expand Down Expand Up @@ -871,6 +872,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {

rawdb.WriteBlock(db, types.NewBlockWithHeader(header))
rawdb.WriteReceipts(db, header.Hash(), header.Number.Uint64(), types.Receipts{})
rawdb.WriteCanonicalHash(db, header.Hash(), header.Number.Uint64())

rawdb.DeleteSkeletonHeader(db, header.Number.Uint64())

Expand All @@ -881,6 +883,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {

rawdb.WriteBlock(db, types.NewBlockWithHeader(filled))
rawdb.WriteReceipts(db, filled.Hash(), filled.Number.Uint64(), types.Receipts{})
rawdb.WriteCanonicalHash(db, filled.Hash(), filled.Number.Uint64())
},

suspendHook: func() *types.Header {
Expand Down Expand Up @@ -941,6 +944,70 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
}
}

// TestSkeletonLinkSkipsNonCanonical verifies that the skeleton only links to a
// local block that is canonical, and descends past block data that is present by
// hash but lacks a canonical number->hash mapping (e.g. blocks imported
// optimistically via the engine API, or orphans left by an unclean shutdown).
// Anchoring on such a block would leave it in place forever without its canonical
// mapping being rewritten, wedging the freezer later on.
func TestSkeletonLinkSkipsNonCanonical(t *testing.T) {
// Build a fake header chain; the skeleton only needs a parent-hash progression.
chain := []*types.Header{{Number: big.NewInt(0)}}
for i := 1; i < 200; i++ {
chain = append(chain, &types.Header{
ParentHash: chain[i-1].Hash(),
Number: big.NewInt(int64(i)),
})
}
const (
canon = 100 // blocks [0..canon] are present AND canonical locally
top = 150 // blocks [canon+1..top] are present by hash but NOT canonical
)
db := rawdb.NewMemoryDatabase()

// Canonical prefix: full block data plus the canonical number->hash mapping.
for i := 0; i <= canon; i++ {
rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[i]))
rawdb.WriteReceipts(db, chain[i].Hash(), chain[i].Number.Uint64(), types.Receipts{})
rawdb.WriteCanonicalHash(db, chain[i].Hash(), chain[i].Number.Uint64())
}
// Scattered non-canonical data: full block data is present, but the canonical
// mapping is deliberately absent.
for i := canon + 1; i <= top; i++ {
rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[i]))
rawdb.WriteReceipts(db, chain[i].Hash(), chain[i].Number.Uint64(), types.Receipts{})
}

// Feed the full chain through a single peer. The fakeChainReader reports an
// effectively infinite snap head, so the canonical mapping is the only thing
// that can prevent the skeleton from linking on a non-canonical block.
peerset := newPeerSet()
drop := func(peer string) { peerset.Unregister(peer) }
peer := newSkeletonTestPeer("peer", chain)
if err := peerset.Register(newPeerConnection(peer.id, eth.ETH69, peer, log.New("id", peer.id))); err != nil {
t.Fatalf("failed to register peer: %v", err)
}
skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller(), &fakeChainReader{})
skeleton.Sync(chain[len(chain)-1], nil, true)
defer skeleton.Terminate()

// The skeleton must link at the canonical block, i.e. the resulting subchain
// tail must descend to canon+1 (covering the whole non-canonical region). With
// a hash-only "known" check it would instead stop at top+1.
wantTail := uint64(canon + 1)
var progress skeletonProgress
for deadline := time.Now().Add(2 * time.Second); ; {
json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
if len(progress.Subchains) == 1 && progress.Subchains[0].Tail == wantTail {
break
}
if time.Now().After(deadline) {
t.Fatalf("skeleton did not link at canonical block: subchains=%+v, want single subchain with tail %d", progress.Subchains, wantTail)
}
time.Sleep(20 * time.Millisecond)
}
}

func checkSkeletonProgress(db ethdb.KeyValueReader, unpredictable bool, peers []*skeletonTestPeer, expected skeletonExpect) error {
var progress skeletonProgress
// Check the post-init end state if it matches the required results
Expand Down
Loading