From acbbf8d19e08b1bfd22781053d31608f7467f03f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 28 May 2026 22:20:50 +0100 Subject: [PATCH 1/3] kvstorage: allow multiple WAG events per RangeID in a node The previous WAG invariant required each RangeID to appear at most once in a node's event list. This was too strict for operations that encompass multiple lifecycle transitions for the same range, such as destroying an old replica and creating a new one in CreateUninitializedReplica. Relax the invariant and update canApplyWAGNode to handle multiple events per RangeID. When a RangeID appears more than once, the replay logic now computes the expected post-event state via advance() and uses it to evaluate subsequent events for the same range, instead of re-loading the persisted state. Epic: none Release note: None Co-Authored-By: Claude Opus 4.6 --- pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto | 8 ++- pkg/kv/kvserver/kvstorage/wag/writer.go | 6 +- pkg/kv/kvserver/kvstorage/wag_replay.go | 60 +++++++++++++++++-- pkg/kv/kvserver/kvstorage/wag_replay_test.go | 44 ++++++++++++++ 4 files changed, 107 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto b/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto index b5991c97e9b8..25b6fd03a81e 100644 --- a/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto +++ b/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto @@ -102,8 +102,12 @@ message Addr { // a Split event that both initializes the RHS and narrows the LHS. message Node { // Events contains the replica lifecycle events that this node encompasses. - // Each event affects a single replica, identified by its Addr. A given - // RangeID must appear at most once in this list. + // Each event affects a single replica, identified by its Addr. + // + // A RangeID may appear in multiple events when one operation encompasses + // multiple lifecycle transitions for the same range (e.g. destroying an old + // replica and creating a new one). When a RangeID appears multiple times, its + // events must be ordered by occurrence (earlier transitions first). // // Each EventType implicitly carries dependency semantics. For example, a // Split event at raft index I implies the replica must be caught up to I-1 diff --git a/pkg/kv/kvserver/kvstorage/wag/writer.go b/pkg/kv/kvserver/kvstorage/wag/writer.go index 464502756692..0eb5e8be85d4 100644 --- a/pkg/kv/kvserver/kvstorage/wag/writer.go +++ b/pkg/kv/kvserver/kvstorage/wag/writer.go @@ -24,7 +24,6 @@ type Writer struct { seq *Seq // events accumulates the lifecycle events for this node. Each event // identifies a replica and the type of lifecycle transition it undergoes. - // A given RangeID must appear at most once. events []wagpb.Event } @@ -45,8 +44,9 @@ func (w *Writer) Empty() bool { } // AddEvent stages a lifecycle event. Each event identifies a replica and the -// type of lifecycle transition (create, split, destroy, etc.). A given RangeID -// must appear at most once across all events in a Writer. +// type of lifecycle transition (create, split, destroy, etc.). A RangeID may +// appear in multiple events when one operation encompasses multiple transitions +// for the same range (e.g. destroying an old replica and creating a new one). // // The startKey is the range's start key, used during replay to load the range // descriptor via a point read. It may be nil for EventCreate (uninitialized diff --git a/pkg/kv/kvserver/kvstorage/wag_replay.go b/pkg/kv/kvserver/kvstorage/wag_replay.go index b91c934e7526..4ea332f7a2a3 100644 --- a/pkg/kv/kvserver/kvstorage/wag_replay.go +++ b/pkg/kv/kvserver/kvstorage/wag_replay.go @@ -10,6 +10,7 @@ import ( "iter" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" @@ -184,6 +185,39 @@ func (s persistedRangeState) canApply(e wagpb.Event) (apply bool) { return apply } +// advance returns the speculative persistedRangeState after the given event is +// applied. This is used to check subsequent events for the same RangeID within +// a single WAG node. +func (s persistedRangeState) advance(e wagpb.Event) persistedRangeState { + switch e.Type { + case wagpb.EventCreate: + return persistedRangeState{ + mark: ReplicaMark{ + RaftReplicaID: kvserverpb.RaftReplicaID{ReplicaID: e.Addr.ReplicaID}, + RangeTombstone: s.mark.RangeTombstone, + }, + appliedIndex: 0, + } + case wagpb.EventInit, wagpb.EventApply, wagpb.EventSplit, wagpb.EventMerge: + return persistedRangeState{ + mark: s.mark, + appliedIndex: e.Addr.Index, + } + case wagpb.EventDestroy, wagpb.EventSubsume: + // After destruction, ReplicaID is cleared and the tombstone is bumped above + // ReplicaID. We don't know the exact NextReplicaID here, but it's ok to + // speculate and assume the conservative ReplicaID+1. + next := max(s.mark.NextReplicaID, e.Addr.ReplicaID+1) + return persistedRangeState{ + mark: ReplicaMark{ + RangeTombstone: kvserverpb.RangeTombstone{NextReplicaID: next}, + }, + } + default: + panic(errors.AssertionFailedf("unexpected event type %d", e.Type)) + } +} + // raftCatchUp returns the raft index the replica must be caught up to before // this WAG event can be applied. Zero means no catch-up is needed. func raftCatchUp(e wagpb.Event) kvpb.RaftIndex { @@ -208,16 +242,27 @@ func raftCatchUp(e wagpb.Event) kvpb.RaftIndex { // range state to decide if the node still needs applying. // // All events in a node are expected to agree on whether they need applying, -// since they are written and applied atomically. +// since they are written and applied atomically. A RangeID may appear in +// multiple events (e.g. destroy old replica + create new one); when this +// happens, the speculative post-event metadata is computed so that subsequent +// events for the same RangeID are verified incrementally. func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (bool, error) { var apply bool + // states caches per-RangeID state so that subsequent events for the same + // RangeID are checked against the expected post-event state. + // + // TODO(pav-kv): can avoid this map if all events pertaining to one RangeID + // are guaranteed to be placed densely, not interleaved by other range IDs. + states := make(map[roachpb.RangeID]persistedRangeState, len(node.Events)) + for i, e := range node.Events { - s, err := loadPersistedRangeState(ctx, stateRO, e.Addr.RangeID) - if err != nil { - return false, errors.Wrapf(err, "loading state for r%d", e.Addr.RangeID) + s, ok := states[e.Addr.RangeID] + if !ok { + var err error + if s, err = loadPersistedRangeState(ctx, stateRO, e.Addr.RangeID); err != nil { + return false, errors.Wrapf(err, "loading state for r%d", e.Addr.RangeID) + } } - // A given RangeID appears at most once in the events list, so the decision - // of whether an event can be applied is independent for each event. if evApply := s.canApply(e); i == 0 { apply = evApply } else if evApply != apply { @@ -226,6 +271,9 @@ func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (boo node.Events[0], apply, i, e, evApply, ) } + if apply { + states[e.Addr.RangeID] = s.advance(e) + } } return apply, nil } diff --git a/pkg/kv/kvserver/kvstorage/wag_replay_test.go b/pkg/kv/kvserver/kvstorage/wag_replay_test.go index 6cece1b97169..68dc06b8db7c 100644 --- a/pkg/kv/kvserver/kvstorage/wag_replay_test.go +++ b/pkg/kv/kvserver/kvstorage/wag_replay_test.go @@ -243,6 +243,50 @@ func TestCanApplyWAGNode(t *testing.T) { {Addr: wagpb.Addr{RangeID: 2, ReplicaID: 1, Index: 10}, Type: wagpb.EventInit}, }}, shouldApply: false, + }, { + name: "multi-event destroy+create same range, needs apply", + states: map[roachpb.RangeID]persistedRangeState{ + // Old replica 5 exists, uninitialized. + 1: {mark: replicaMark(5, 0)}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 0}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: true, + }, { + name: "multi-event destroy+create same range, already applied", + states: map[roachpb.RangeID]persistedRangeState{ + // New replica 10 already exists (destroy+create already applied). + 1: {mark: replicaMark(10, 10)}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 0}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: false, + }, { + name: "multi-event destroy+create initialized replica, needs apply", + states: map[roachpb.RangeID]persistedRangeState{ + // Initialized replica 5, applied up to index 42. + 1: {mark: replicaMark(5, 0), appliedIndex: 42}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 42}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: true, + expCatchUps: []raftCatchUpTarget{{rangeID: 1, index: 42}}, + }, { + name: "multi-event destroy+create initialized replica, already applied", + states: map[roachpb.RangeID]persistedRangeState{ + 1: {mark: replicaMark(10, 10)}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 42}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: false, }, { name: "multi-event disagreement returns error", states: map[roachpb.RangeID]persistedRangeState{ From b3c0361bc0481373e18a8ae3121bf5b4883b45b8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 28 May 2026 22:22:08 +0100 Subject: [PATCH 2/3] kvstorage: remove LoadReplicaState from CreateUninitializedReplica The LoadReplicaState call at the end of CreateUninitializedReplica was a post-write sanity check that read back the ReplicaMark just written. This required the caller to provide a read-write batch (so own writes are visible), and also required passing storeID and raftRO which were only used by this check. Remove this verification step. The caller (tryGetOrCreateReplica) already calls LoadReplicaState separately after committing the batch, so the check was redundant. Epic: none Release note: None Co-Authored-By: Claude Opus 4.6 --- pkg/kv/kvserver/kvstorage/create_test.go | 2 +- pkg/kv/kvserver/kvstorage/replica_state.go | 16 ++-------------- .../replica_lifecycle_datadriven_test.go | 3 +-- pkg/kv/kvserver/store_create_replica.go | 6 ++---- 4 files changed, 6 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/create_test.go b/pkg/kv/kvserver/kvstorage/create_test.go index ffcdfb185da9..866f01111eca 100644 --- a/pkg/kv/kvserver/kvstorage/create_test.go +++ b/pkg/kv/kvserver/kvstorage/create_test.go @@ -27,7 +27,7 @@ func TestCreateUninitializedReplica(t *testing.T) { ctx := context.Background() out := testMutateSep(t, "create", e, func(rw ReadWriter, w *wag.Writer) { require.NoError(t, CreateUninitializedReplica( - ctx, rw.State, rw.Raft.RO, w, 1, /* storeID */ + ctx, rw.State, w, roachpb.FullReplicaID{RangeID: 123, ReplicaID: 3}, )) }) diff --git a/pkg/kv/kvserver/kvstorage/replica_state.go b/pkg/kv/kvserver/kvstorage/replica_state.go index fb1246a7cd3d..b043dd8633e1 100644 --- a/pkg/kv/kvserver/kvstorage/replica_state.go +++ b/pkg/kv/kvserver/kvstorage/replica_state.go @@ -109,12 +109,7 @@ func (r LoadedReplicaState) check(storeID roachpb.StoreID) error { // Returns kvpb.RaftGroupDeletedError if this replica can not be created // because it has been deleted. func CreateUninitializedReplica( - ctx context.Context, - stateRW State, - raftRO RaftRO, - w *wag.Writer, - storeID roachpb.StoreID, - id roachpb.FullReplicaID, + ctx context.Context, stateRW State, w *wag.Writer, id roachpb.FullReplicaID, ) error { sl := MakeStateLoader(id.RangeID) // Before creating the replica, see if there is a tombstone or an existing @@ -141,12 +136,5 @@ func CreateUninitializedReplica( // non-existent. The only RangeID-specific key that can be present is the // RangeTombstone inspected above. w.AddEvent(wagpb.MakeAddr(id, 0), wagpb.EventCreate, nil /* startKey */) - if err := sl.SetRaftReplicaID(ctx, stateRW.WO, id.ReplicaID); err != nil { - return err - } - - // Make sure that storage invariants for this uninitialized replica hold. - uninitDesc := roachpb.RangeDescriptor{RangeID: id.RangeID} - _, err := LoadReplicaState(ctx, stateRW.RO, raftRO, storeID, &uninitDesc, id.ReplicaID) - return err + return sl.SetRaftReplicaID(ctx, stateRW.WO, id.ReplicaID) } diff --git a/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go b/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go index 18e08c3a14ef..c55d1f6b7d3f 100644 --- a/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go +++ b/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go @@ -209,9 +209,8 @@ func TestReplicaLifecycleDataDriven(t *testing.T) { )) } else { err = kvstorage.CreateUninitializedReplica( - ctx, kvstorage.WrapState(b.State()), kvstorage.RaftRO(tc.eng.LogEngine()), + ctx, kvstorage.WrapState(b.State()), b.WagWriter(), - 1, /* StoreID */ roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID}, ) } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 2139f0b0b95d..4e238fb95b8a 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -185,14 +185,12 @@ func (s *Store) tryGetOrCreateReplica( // Replica for this rangeID, and that's us. // Use a read-write batch (not a write-only batch) because - // CreateUninitializedReplica needs to read back the RaftReplicaID it writes - // as part of its LoadReplicaState verification step. + // CreateUninitializedReplica reads the ReplicaMark before writing. b := s.batchFactory.NewBatch() defer b.Close() if err := kvstorage.CreateUninitializedReplica( ctx, kvstorage.WrapState(b.State()), - kvstorage.RaftRO(s.LogEngine()), - b.WagWriter(), s.StoreID(), id, + b.WagWriter(), id, ); err != nil { return nil, false, err } From 5190175ebad8799ed7ae740f81340e1b352eb817 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 28 May 2026 22:22:57 +0100 Subject: [PATCH 3/3] kvstorage: destroy old replica in CreateUninitializedReplica When CreateUninitializedReplica finds an existing replica with an older ReplicaID, it must destroy the old replica before creating the new one. Previously, the code fell through and only overwrote the RaftReplicaID, leaving behind stale raft state (HardState, raft log) from the old replica. This could cause a newly created replica to pick up a non-empty HardState.Commit, violating the uninitialized replica invariant. Fix this by calling DestroyReplica on the old replica, which clears all its state and writes a RangeTombstone. This is the same path used by removeUninitializedReplicaRaftMuLocked. The function signature changes from taking State to taking a ReadWriter, since destroying the old replica requires raft write access. Epic: none Release note: None Co-Authored-By: Claude Opus 4.6 --- pkg/kv/kvserver/kvstorage/create_test.go | 2 +- pkg/kv/kvserver/kvstorage/replica_state.go | 19 +++++++++++-------- .../replica_lifecycle_datadriven_test.go | 5 ++++- pkg/kv/kvserver/store_create_replica.go | 5 ++++- .../uninitialized_replica_restart.txt | 9 ++++++--- 5 files changed, 26 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/create_test.go b/pkg/kv/kvserver/kvstorage/create_test.go index 866f01111eca..3bce0692d94d 100644 --- a/pkg/kv/kvserver/kvstorage/create_test.go +++ b/pkg/kv/kvserver/kvstorage/create_test.go @@ -27,7 +27,7 @@ func TestCreateUninitializedReplica(t *testing.T) { ctx := context.Background() out := testMutateSep(t, "create", e, func(rw ReadWriter, w *wag.Writer) { require.NoError(t, CreateUninitializedReplica( - ctx, rw.State, w, + ctx, rw, w, roachpb.FullReplicaID{RangeID: 123, ReplicaID: 3}, )) }) diff --git a/pkg/kv/kvserver/kvstorage/replica_state.go b/pkg/kv/kvserver/kvstorage/replica_state.go index b043dd8633e1..5048ae58ff52 100644 --- a/pkg/kv/kvserver/kvstorage/replica_state.go +++ b/pkg/kv/kvserver/kvstorage/replica_state.go @@ -109,24 +109,27 @@ func (r LoadedReplicaState) check(storeID roachpb.StoreID) error { // Returns kvpb.RaftGroupDeletedError if this replica can not be created // because it has been deleted. func CreateUninitializedReplica( - ctx context.Context, stateRW State, w *wag.Writer, id roachpb.FullReplicaID, + ctx context.Context, rw ReadWriter, w *wag.Writer, id roachpb.FullReplicaID, ) error { sl := MakeStateLoader(id.RangeID) // Before creating the replica, see if there is a tombstone or an existing // replica which would indicate that our ReplicaID is stale and can not come // back to this Store again. - if mark, err := sl.LoadReplicaMark(ctx, stateRW.RO); err != nil { + if mark, err := sl.LoadReplicaMark(ctx, rw.State.RO); err != nil { return err } else if mark.Destroyed(id.ReplicaID) { return &kvpb.RaftGroupDeletedError{} } else if mark.Is(id.ReplicaID) { return nil // the replica already exists } else if mark.Exists() { - // TODO(pav-kv): there is a replica with an older ReplicaID. We must destroy - // it, and create a new one. Right now, the code falls through and writes - // the new RaftReplicaID, but this replica can already have a non-empty - // HardState. This is a bug. - _ = 0 // make linter happy + // There is a replica with an older ReplicaID. Destroy it before creating + // the new one, to clean up all its state (e.g. HardState, raft log). + oldID := roachpb.FullReplicaID{RangeID: id.RangeID, ReplicaID: mark.ReplicaID} + if err := DestroyReplica(ctx, rw, w, DestroyReplicaInfo{ + FullReplicaID: oldID, + }, id.ReplicaID); err != nil { + return err + } } // Write the RaftReplicaID for this replica. This is the only place in the @@ -136,5 +139,5 @@ func CreateUninitializedReplica( // non-existent. The only RangeID-specific key that can be present is the // RangeTombstone inspected above. w.AddEvent(wagpb.MakeAddr(id, 0), wagpb.EventCreate, nil /* startKey */) - return sl.SetRaftReplicaID(ctx, stateRW.WO, id.ReplicaID) + return sl.SetRaftReplicaID(ctx, rw.State.WO, id.ReplicaID) } diff --git a/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go b/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go index c55d1f6b7d3f..6de3a6e4a0f8 100644 --- a/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go +++ b/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go @@ -209,7 +209,10 @@ func TestReplicaLifecycleDataDriven(t *testing.T) { )) } else { err = kvstorage.CreateUninitializedReplica( - ctx, kvstorage.WrapState(b.State()), + ctx, kvstorage.ReadWriter{ + State: kvstorage.WrapState(b.State()), + Raft: kvstorage.Raft{RO: kvstorage.RaftRO(tc.eng.LogEngine()), WO: kvstorage.RaftWO(b.Raft())}, + }, b.WagWriter(), roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID}, ) diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 4e238fb95b8a..ccc67cc23290 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -189,7 +189,10 @@ func (s *Store) tryGetOrCreateReplica( b := s.batchFactory.NewBatch() defer b.Close() if err := kvstorage.CreateUninitializedReplica( - ctx, kvstorage.WrapState(b.State()), + ctx, kvstorage.ReadWriter{ + State: kvstorage.WrapState(b.State()), + Raft: kvstorage.Raft{RO: kvstorage.RaftRO(s.LogEngine()), WO: kvstorage.RaftWO(b.Raft())}, + }, b.WagWriter(), id, ); err != nil { return nil, false, err diff --git a/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt b/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt index e042e2bb3e75..e76c355bd6be 100644 --- a/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt +++ b/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt @@ -52,11 +52,14 @@ created replica: (n1,s1):10 state engine: (matches WAG node) log engine: -Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): (r1/10:0,EventCreate) +Delete (Sized at 38): 0,0 /Local/RangeID/1/u/RaftHardState (0x016989757266746800): +Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): (r1/5:0,EventDestroy) (r1/10:0,EventCreate) +> Put: 0,0 /Local/RangeID/1/u/RangeTombstone (0x016989757266746200): next_replica_id:10 +> Delete: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200): > Put: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200): replica_id:10 -# TODO(pav-kv): HardState of the previous replica should not be inherited. +# The old replica's HardState is cleared by the destroy. print-range-state ---- range desc: r1:{a-d} [(n1,s1):5, (n2,s2):6, (n3,s3):7, next=8, gen=0] - replica (n1/s1): id=10 [uninitialized] HardState={Term:10,Vote:2,Commit:0} + replica (n1/s1): id=10 [uninitialized] HardState={Term:0,Vote:0,Commit:0}