diff --git a/pkg/kv/kvserver/kvstorage/create_test.go b/pkg/kv/kvserver/kvstorage/create_test.go index ffcdfb185da9..3bce0692d94d 100644 --- a/pkg/kv/kvserver/kvstorage/create_test.go +++ b/pkg/kv/kvserver/kvstorage/create_test.go @@ -27,7 +27,7 @@ func TestCreateUninitializedReplica(t *testing.T) { ctx := context.Background() out := testMutateSep(t, "create", e, func(rw ReadWriter, w *wag.Writer) { require.NoError(t, CreateUninitializedReplica( - ctx, rw.State, rw.Raft.RO, w, 1, /* storeID */ + ctx, rw, w, roachpb.FullReplicaID{RangeID: 123, ReplicaID: 3}, )) }) diff --git a/pkg/kv/kvserver/kvstorage/replica_state.go b/pkg/kv/kvserver/kvstorage/replica_state.go index fb1246a7cd3d..5048ae58ff52 100644 --- a/pkg/kv/kvserver/kvstorage/replica_state.go +++ b/pkg/kv/kvserver/kvstorage/replica_state.go @@ -109,29 +109,27 @@ func (r LoadedReplicaState) check(storeID roachpb.StoreID) error { // Returns kvpb.RaftGroupDeletedError if this replica can not be created // because it has been deleted. func CreateUninitializedReplica( - ctx context.Context, - stateRW State, - raftRO RaftRO, - w *wag.Writer, - storeID roachpb.StoreID, - id roachpb.FullReplicaID, + ctx context.Context, rw ReadWriter, w *wag.Writer, id roachpb.FullReplicaID, ) error { sl := MakeStateLoader(id.RangeID) // Before creating the replica, see if there is a tombstone or an existing // replica which would indicate that our ReplicaID is stale and can not come // back to this Store again. - if mark, err := sl.LoadReplicaMark(ctx, stateRW.RO); err != nil { + if mark, err := sl.LoadReplicaMark(ctx, rw.State.RO); err != nil { return err } else if mark.Destroyed(id.ReplicaID) { return &kvpb.RaftGroupDeletedError{} } else if mark.Is(id.ReplicaID) { return nil // the replica already exists } else if mark.Exists() { - // TODO(pav-kv): there is a replica with an older ReplicaID. We must destroy - // it, and create a new one. Right now, the code falls through and writes - // the new RaftReplicaID, but this replica can already have a non-empty - // HardState. This is a bug. - _ = 0 // make linter happy + // There is a replica with an older ReplicaID. Destroy it before creating + // the new one, to clean up all its state (e.g. HardState, raft log). + oldID := roachpb.FullReplicaID{RangeID: id.RangeID, ReplicaID: mark.ReplicaID} + if err := DestroyReplica(ctx, rw, w, DestroyReplicaInfo{ + FullReplicaID: oldID, + }, id.ReplicaID); err != nil { + return err + } } // Write the RaftReplicaID for this replica. This is the only place in the @@ -141,12 +139,5 @@ func CreateUninitializedReplica( // non-existent. The only RangeID-specific key that can be present is the // RangeTombstone inspected above. w.AddEvent(wagpb.MakeAddr(id, 0), wagpb.EventCreate, nil /* startKey */) - if err := sl.SetRaftReplicaID(ctx, stateRW.WO, id.ReplicaID); err != nil { - return err - } - - // Make sure that storage invariants for this uninitialized replica hold. - uninitDesc := roachpb.RangeDescriptor{RangeID: id.RangeID} - _, err := LoadReplicaState(ctx, stateRW.RO, raftRO, storeID, &uninitDesc, id.ReplicaID) - return err + return sl.SetRaftReplicaID(ctx, rw.State.WO, id.ReplicaID) } diff --git a/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto b/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto index b5991c97e9b8..25b6fd03a81e 100644 --- a/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto +++ b/pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto @@ -102,8 +102,12 @@ message Addr { // a Split event that both initializes the RHS and narrows the LHS. message Node { // Events contains the replica lifecycle events that this node encompasses. - // Each event affects a single replica, identified by its Addr. A given - // RangeID must appear at most once in this list. + // Each event affects a single replica, identified by its Addr. + // + // A RangeID may appear in multiple events when one operation encompasses + // multiple lifecycle transitions for the same range (e.g. destroying an old + // replica and creating a new one). When a RangeID appears multiple times, its + // events must be ordered by occurrence (earlier transitions first). // // Each EventType implicitly carries dependency semantics. For example, a // Split event at raft index I implies the replica must be caught up to I-1 diff --git a/pkg/kv/kvserver/kvstorage/wag/writer.go b/pkg/kv/kvserver/kvstorage/wag/writer.go index 464502756692..0eb5e8be85d4 100644 --- a/pkg/kv/kvserver/kvstorage/wag/writer.go +++ b/pkg/kv/kvserver/kvstorage/wag/writer.go @@ -24,7 +24,6 @@ type Writer struct { seq *Seq // events accumulates the lifecycle events for this node. Each event // identifies a replica and the type of lifecycle transition it undergoes. - // A given RangeID must appear at most once. events []wagpb.Event } @@ -45,8 +44,9 @@ func (w *Writer) Empty() bool { } // AddEvent stages a lifecycle event. Each event identifies a replica and the -// type of lifecycle transition (create, split, destroy, etc.). A given RangeID -// must appear at most once across all events in a Writer. +// type of lifecycle transition (create, split, destroy, etc.). A RangeID may +// appear in multiple events when one operation encompasses multiple transitions +// for the same range (e.g. destroying an old replica and creating a new one). // // The startKey is the range's start key, used during replay to load the range // descriptor via a point read. It may be nil for EventCreate (uninitialized diff --git a/pkg/kv/kvserver/kvstorage/wag_replay.go b/pkg/kv/kvserver/kvstorage/wag_replay.go index b91c934e7526..4ea332f7a2a3 100644 --- a/pkg/kv/kvserver/kvstorage/wag_replay.go +++ b/pkg/kv/kvserver/kvstorage/wag_replay.go @@ -10,6 +10,7 @@ import ( "iter" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" @@ -184,6 +185,39 @@ func (s persistedRangeState) canApply(e wagpb.Event) (apply bool) { return apply } +// advance returns the speculative persistedRangeState after the given event is +// applied. This is used to check subsequent events for the same RangeID within +// a single WAG node. +func (s persistedRangeState) advance(e wagpb.Event) persistedRangeState { + switch e.Type { + case wagpb.EventCreate: + return persistedRangeState{ + mark: ReplicaMark{ + RaftReplicaID: kvserverpb.RaftReplicaID{ReplicaID: e.Addr.ReplicaID}, + RangeTombstone: s.mark.RangeTombstone, + }, + appliedIndex: 0, + } + case wagpb.EventInit, wagpb.EventApply, wagpb.EventSplit, wagpb.EventMerge: + return persistedRangeState{ + mark: s.mark, + appliedIndex: e.Addr.Index, + } + case wagpb.EventDestroy, wagpb.EventSubsume: + // After destruction, ReplicaID is cleared and the tombstone is bumped above + // ReplicaID. We don't know the exact NextReplicaID here, but it's ok to + // speculate and assume the conservative ReplicaID+1. + next := max(s.mark.NextReplicaID, e.Addr.ReplicaID+1) + return persistedRangeState{ + mark: ReplicaMark{ + RangeTombstone: kvserverpb.RangeTombstone{NextReplicaID: next}, + }, + } + default: + panic(errors.AssertionFailedf("unexpected event type %d", e.Type)) + } +} + // raftCatchUp returns the raft index the replica must be caught up to before // this WAG event can be applied. Zero means no catch-up is needed. func raftCatchUp(e wagpb.Event) kvpb.RaftIndex { @@ -208,16 +242,27 @@ func raftCatchUp(e wagpb.Event) kvpb.RaftIndex { // range state to decide if the node still needs applying. // // All events in a node are expected to agree on whether they need applying, -// since they are written and applied atomically. +// since they are written and applied atomically. A RangeID may appear in +// multiple events (e.g. destroy old replica + create new one); when this +// happens, the speculative post-event metadata is computed so that subsequent +// events for the same RangeID are verified incrementally. func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (bool, error) { var apply bool + // states caches per-RangeID state so that subsequent events for the same + // RangeID are checked against the expected post-event state. + // + // TODO(pav-kv): can avoid this map if all events pertaining to one RangeID + // are guaranteed to be placed densely, not interleaved by other range IDs. + states := make(map[roachpb.RangeID]persistedRangeState, len(node.Events)) + for i, e := range node.Events { - s, err := loadPersistedRangeState(ctx, stateRO, e.Addr.RangeID) - if err != nil { - return false, errors.Wrapf(err, "loading state for r%d", e.Addr.RangeID) + s, ok := states[e.Addr.RangeID] + if !ok { + var err error + if s, err = loadPersistedRangeState(ctx, stateRO, e.Addr.RangeID); err != nil { + return false, errors.Wrapf(err, "loading state for r%d", e.Addr.RangeID) + } } - // A given RangeID appears at most once in the events list, so the decision - // of whether an event can be applied is independent for each event. if evApply := s.canApply(e); i == 0 { apply = evApply } else if evApply != apply { @@ -226,6 +271,9 @@ func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (boo node.Events[0], apply, i, e, evApply, ) } + if apply { + states[e.Addr.RangeID] = s.advance(e) + } } return apply, nil } diff --git a/pkg/kv/kvserver/kvstorage/wag_replay_test.go b/pkg/kv/kvserver/kvstorage/wag_replay_test.go index 6cece1b97169..68dc06b8db7c 100644 --- a/pkg/kv/kvserver/kvstorage/wag_replay_test.go +++ b/pkg/kv/kvserver/kvstorage/wag_replay_test.go @@ -243,6 +243,50 @@ func TestCanApplyWAGNode(t *testing.T) { {Addr: wagpb.Addr{RangeID: 2, ReplicaID: 1, Index: 10}, Type: wagpb.EventInit}, }}, shouldApply: false, + }, { + name: "multi-event destroy+create same range, needs apply", + states: map[roachpb.RangeID]persistedRangeState{ + // Old replica 5 exists, uninitialized. + 1: {mark: replicaMark(5, 0)}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 0}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: true, + }, { + name: "multi-event destroy+create same range, already applied", + states: map[roachpb.RangeID]persistedRangeState{ + // New replica 10 already exists (destroy+create already applied). + 1: {mark: replicaMark(10, 10)}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 0}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: false, + }, { + name: "multi-event destroy+create initialized replica, needs apply", + states: map[roachpb.RangeID]persistedRangeState{ + // Initialized replica 5, applied up to index 42. + 1: {mark: replicaMark(5, 0), appliedIndex: 42}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 42}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: true, + expCatchUps: []raftCatchUpTarget{{rangeID: 1, index: 42}}, + }, { + name: "multi-event destroy+create initialized replica, already applied", + states: map[roachpb.RangeID]persistedRangeState{ + 1: {mark: replicaMark(10, 10)}, + }, + node: wagpb.Node{Events: []wagpb.Event{ + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 42}, Type: wagpb.EventDestroy}, + {Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate}, + }}, + shouldApply: false, }, { name: "multi-event disagreement returns error", states: map[roachpb.RangeID]persistedRangeState{ diff --git a/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go b/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go index 18e08c3a14ef..6de3a6e4a0f8 100644 --- a/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go +++ b/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go @@ -209,9 +209,11 @@ func TestReplicaLifecycleDataDriven(t *testing.T) { )) } else { err = kvstorage.CreateUninitializedReplica( - ctx, kvstorage.WrapState(b.State()), kvstorage.RaftRO(tc.eng.LogEngine()), + ctx, kvstorage.ReadWriter{ + State: kvstorage.WrapState(b.State()), + Raft: kvstorage.Raft{RO: kvstorage.RaftRO(tc.eng.LogEngine()), WO: kvstorage.RaftWO(b.Raft())}, + }, b.WagWriter(), - 1, /* StoreID */ roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID}, ) } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 2139f0b0b95d..ccc67cc23290 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -185,14 +185,15 @@ func (s *Store) tryGetOrCreateReplica( // Replica for this rangeID, and that's us. // Use a read-write batch (not a write-only batch) because - // CreateUninitializedReplica needs to read back the RaftReplicaID it writes - // as part of its LoadReplicaState verification step. + // CreateUninitializedReplica reads the ReplicaMark before writing. b := s.batchFactory.NewBatch() defer b.Close() if err := kvstorage.CreateUninitializedReplica( - ctx, kvstorage.WrapState(b.State()), - kvstorage.RaftRO(s.LogEngine()), - b.WagWriter(), s.StoreID(), id, + ctx, kvstorage.ReadWriter{ + State: kvstorage.WrapState(b.State()), + Raft: kvstorage.Raft{RO: kvstorage.RaftRO(s.LogEngine()), WO: kvstorage.RaftWO(b.Raft())}, + }, + b.WagWriter(), id, ); err != nil { return nil, false, err } diff --git a/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt b/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt index e042e2bb3e75..e76c355bd6be 100644 --- a/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt +++ b/pkg/kv/kvserver/testdata/replica_lifecycle/uninitialized_replica_restart.txt @@ -52,11 +52,14 @@ created replica: (n1,s1):10 state engine: (matches WAG node) log engine: -Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): (r1/10:0,EventCreate) +Delete (Sized at 38): 0,0 /Local/RangeID/1/u/RaftHardState (0x016989757266746800): +Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): (r1/5:0,EventDestroy) (r1/10:0,EventCreate) +> Put: 0,0 /Local/RangeID/1/u/RangeTombstone (0x016989757266746200): next_replica_id:10 +> Delete: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200): > Put: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200): replica_id:10 -# TODO(pav-kv): HardState of the previous replica should not be inherited. +# The old replica's HardState is cleared by the destroy. print-range-state ---- range desc: r1:{a-d} [(n1,s1):5, (n2,s2):6, (n3,s3):7, next=8, gen=0] - replica (n1/s1): id=10 [uninitialized] HardState={Term:10,Vote:2,Commit:0} + replica (n1/s1): id=10 [uninitialized] HardState={Term:0,Vote:0,Commit:0}