Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvstorage/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCreateUninitializedReplica(t *testing.T) {
ctx := context.Background()
out := testMutateSep(t, "create", e, func(rw ReadWriter, w *wag.Writer) {
require.NoError(t, CreateUninitializedReplica(
ctx, rw.State, rw.Raft.RO, w, 1, /* storeID */
ctx, rw, w,
roachpb.FullReplicaID{RangeID: 123, ReplicaID: 3},
))
})
Expand Down
31 changes: 11 additions & 20 deletions pkg/kv/kvserver/kvstorage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,29 +109,27 @@ func (r LoadedReplicaState) check(storeID roachpb.StoreID) error {
// Returns kvpb.RaftGroupDeletedError if this replica can not be created
// because it has been deleted.
func CreateUninitializedReplica(
ctx context.Context,
stateRW State,
raftRO RaftRO,
w *wag.Writer,
storeID roachpb.StoreID,
id roachpb.FullReplicaID,
ctx context.Context, rw ReadWriter, w *wag.Writer, id roachpb.FullReplicaID,
) error {
sl := MakeStateLoader(id.RangeID)
// Before creating the replica, see if there is a tombstone or an existing
// replica which would indicate that our ReplicaID is stale and can not come
// back to this Store again.
if mark, err := sl.LoadReplicaMark(ctx, stateRW.RO); err != nil {
if mark, err := sl.LoadReplicaMark(ctx, rw.State.RO); err != nil {
return err
} else if mark.Destroyed(id.ReplicaID) {
return &kvpb.RaftGroupDeletedError{}
} else if mark.Is(id.ReplicaID) {
return nil // the replica already exists
} else if mark.Exists() {
// TODO(pav-kv): there is a replica with an older ReplicaID. We must destroy
// it, and create a new one. Right now, the code falls through and writes
// the new RaftReplicaID, but this replica can already have a non-empty
// HardState. This is a bug.
_ = 0 // make linter happy
// There is a replica with an older ReplicaID. Destroy it before creating
// the new one, to clean up all its state (e.g. HardState, raft log).
oldID := roachpb.FullReplicaID{RangeID: id.RangeID, ReplicaID: mark.ReplicaID}
if err := DestroyReplica(ctx, rw, w, DestroyReplicaInfo{
FullReplicaID: oldID,
}, id.ReplicaID); err != nil {
return err
}
}

// Write the RaftReplicaID for this replica. This is the only place in the
Expand All @@ -141,12 +139,5 @@ func CreateUninitializedReplica(
// non-existent. The only RangeID-specific key that can be present is the
// RangeTombstone inspected above.
w.AddEvent(wagpb.MakeAddr(id, 0), wagpb.EventCreate, nil /* startKey */)
if err := sl.SetRaftReplicaID(ctx, stateRW.WO, id.ReplicaID); err != nil {
return err
}

// Make sure that storage invariants for this uninitialized replica hold.
uninitDesc := roachpb.RangeDescriptor{RangeID: id.RangeID}
_, err := LoadReplicaState(ctx, stateRW.RO, raftRO, storeID, &uninitDesc, id.ReplicaID)
return err
return sl.SetRaftReplicaID(ctx, rw.State.WO, id.ReplicaID)
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/kvstorage/wag/wagpb/wag.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ message Addr {
// a Split event that both initializes the RHS and narrows the LHS.
message Node {
// Events contains the replica lifecycle events that this node encompasses.
// Each event affects a single replica, identified by its Addr. A given
// RangeID must appear at most once in this list.
// Each event affects a single replica, identified by its Addr.
//
// A RangeID may appear in multiple events when one operation encompasses
// multiple lifecycle transitions for the same range (e.g. destroying an old
// replica and creating a new one). When a RangeID appears multiple times, its
// events must be ordered by occurrence (earlier transitions first).
//
// Each EventType implicitly carries dependency semantics. For example, a
// Split event at raft index I implies the replica must be caught up to I-1
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/kvstorage/wag/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type Writer struct {
seq *Seq
// events accumulates the lifecycle events for this node. Each event
// identifies a replica and the type of lifecycle transition it undergoes.
// A given RangeID must appear at most once.
events []wagpb.Event
}

Expand All @@ -45,8 +44,9 @@ func (w *Writer) Empty() bool {
}

// AddEvent stages a lifecycle event. Each event identifies a replica and the
// type of lifecycle transition (create, split, destroy, etc.). A given RangeID
// must appear at most once across all events in a Writer.
// type of lifecycle transition (create, split, destroy, etc.). A RangeID may
// appear in multiple events when one operation encompasses multiple transitions
// for the same range (e.g. destroying an old replica and creating a new one).
//
// The startKey is the range's start key, used during replay to load the range
// descriptor via a point read. It may be nil for EventCreate (uninitialized
Expand Down
60 changes: 54 additions & 6 deletions pkg/kv/kvserver/kvstorage/wag_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"iter"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
Expand Down Expand Up @@ -184,6 +185,39 @@ func (s persistedRangeState) canApply(e wagpb.Event) (apply bool) {
return apply
}

// advance returns the speculative persistedRangeState after the given event is
// applied. This is used to check subsequent events for the same RangeID within
// a single WAG node.
func (s persistedRangeState) advance(e wagpb.Event) persistedRangeState {
switch e.Type {
case wagpb.EventCreate:
return persistedRangeState{
mark: ReplicaMark{
RaftReplicaID: kvserverpb.RaftReplicaID{ReplicaID: e.Addr.ReplicaID},
RangeTombstone: s.mark.RangeTombstone,
},
appliedIndex: 0,
}
case wagpb.EventInit, wagpb.EventApply, wagpb.EventSplit, wagpb.EventMerge:
return persistedRangeState{
mark: s.mark,
appliedIndex: e.Addr.Index,
}
case wagpb.EventDestroy, wagpb.EventSubsume:
// After destruction, ReplicaID is cleared and the tombstone is bumped above
// ReplicaID. We don't know the exact NextReplicaID here, but it's ok to
// speculate and assume the conservative ReplicaID+1.
next := max(s.mark.NextReplicaID, e.Addr.ReplicaID+1)
return persistedRangeState{
mark: ReplicaMark{
RangeTombstone: kvserverpb.RangeTombstone{NextReplicaID: next},
},
}
default:
panic(errors.AssertionFailedf("unexpected event type %d", e.Type))
}
}

// raftCatchUp returns the raft index the replica must be caught up to before
// this WAG event can be applied. Zero means no catch-up is needed.
func raftCatchUp(e wagpb.Event) kvpb.RaftIndex {
Expand All @@ -208,16 +242,27 @@ func raftCatchUp(e wagpb.Event) kvpb.RaftIndex {
// range state to decide if the node still needs applying.
//
// All events in a node are expected to agree on whether they need applying,
// since they are written and applied atomically.
// since they are written and applied atomically. A RangeID may appear in
// multiple events (e.g. destroy old replica + create new one); when this
// happens, the speculative post-event metadata is computed so that subsequent
// events for the same RangeID are verified incrementally.
func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (bool, error) {
var apply bool
// states caches per-RangeID state so that subsequent events for the same
// RangeID are checked against the expected post-event state.
//
// TODO(pav-kv): can avoid this map if all events pertaining to one RangeID
// are guaranteed to be placed densely, not interleaved by other range IDs.
states := make(map[roachpb.RangeID]persistedRangeState, len(node.Events))

for i, e := range node.Events {
s, err := loadPersistedRangeState(ctx, stateRO, e.Addr.RangeID)
if err != nil {
return false, errors.Wrapf(err, "loading state for r%d", e.Addr.RangeID)
s, ok := states[e.Addr.RangeID]
if !ok {
var err error
if s, err = loadPersistedRangeState(ctx, stateRO, e.Addr.RangeID); err != nil {
return false, errors.Wrapf(err, "loading state for r%d", e.Addr.RangeID)
}
}
// A given RangeID appears at most once in the events list, so the decision
// of whether an event can be applied is independent for each event.
if evApply := s.canApply(e); i == 0 {
apply = evApply
} else if evApply != apply {
Expand All @@ -226,6 +271,9 @@ func canApplyWAGNode(ctx context.Context, node wagpb.Node, stateRO StateRO) (boo
node.Events[0], apply, i, e, evApply,
)
}
if apply {
states[e.Addr.RangeID] = s.advance(e)
}
}
return apply, nil
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/kv/kvserver/kvstorage/wag_replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,50 @@ func TestCanApplyWAGNode(t *testing.T) {
{Addr: wagpb.Addr{RangeID: 2, ReplicaID: 1, Index: 10}, Type: wagpb.EventInit},
}},
shouldApply: false,
}, {
name: "multi-event destroy+create same range, needs apply",
states: map[roachpb.RangeID]persistedRangeState{
// Old replica 5 exists, uninitialized.
1: {mark: replicaMark(5, 0)},
},
node: wagpb.Node{Events: []wagpb.Event{
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 0}, Type: wagpb.EventDestroy},
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate},
}},
shouldApply: true,
}, {
name: "multi-event destroy+create same range, already applied",
states: map[roachpb.RangeID]persistedRangeState{
// New replica 10 already exists (destroy+create already applied).
1: {mark: replicaMark(10, 10)},
},
node: wagpb.Node{Events: []wagpb.Event{
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 0}, Type: wagpb.EventDestroy},
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate},
}},
shouldApply: false,
}, {
name: "multi-event destroy+create initialized replica, needs apply",
states: map[roachpb.RangeID]persistedRangeState{
// Initialized replica 5, applied up to index 42.
1: {mark: replicaMark(5, 0), appliedIndex: 42},
},
node: wagpb.Node{Events: []wagpb.Event{
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 42}, Type: wagpb.EventDestroy},
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate},
}},
shouldApply: true,
expCatchUps: []raftCatchUpTarget{{rangeID: 1, index: 42}},
}, {
name: "multi-event destroy+create initialized replica, already applied",
states: map[roachpb.RangeID]persistedRangeState{
1: {mark: replicaMark(10, 10)},
},
node: wagpb.Node{Events: []wagpb.Event{
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 5, Index: 42}, Type: wagpb.EventDestroy},
{Addr: wagpb.Addr{RangeID: 1, ReplicaID: 10, Index: 0}, Type: wagpb.EventCreate},
}},
shouldApply: false,
}, {
name: "multi-event disagreement returns error",
states: map[roachpb.RangeID]persistedRangeState{
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_lifecycle_datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ func TestReplicaLifecycleDataDriven(t *testing.T) {
))
} else {
err = kvstorage.CreateUninitializedReplica(
ctx, kvstorage.WrapState(b.State()), kvstorage.RaftRO(tc.eng.LogEngine()),
ctx, kvstorage.ReadWriter{
State: kvstorage.WrapState(b.State()),
Raft: kvstorage.Raft{RO: kvstorage.RaftRO(tc.eng.LogEngine()), WO: kvstorage.RaftWO(b.Raft())},
},
b.WagWriter(),
1, /* StoreID */
roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID},
)
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,15 @@ func (s *Store) tryGetOrCreateReplica(
// Replica for this rangeID, and that's us.

// Use a read-write batch (not a write-only batch) because
// CreateUninitializedReplica needs to read back the RaftReplicaID it writes
// as part of its LoadReplicaState verification step.
// CreateUninitializedReplica reads the ReplicaMark before writing.
b := s.batchFactory.NewBatch()
defer b.Close()
if err := kvstorage.CreateUninitializedReplica(
ctx, kvstorage.WrapState(b.State()),
kvstorage.RaftRO(s.LogEngine()),
b.WagWriter(), s.StoreID(), id,
ctx, kvstorage.ReadWriter{
State: kvstorage.WrapState(b.State()),
Raft: kvstorage.Raft{RO: kvstorage.RaftRO(s.LogEngine()), WO: kvstorage.RaftWO(b.Raft())},
},
b.WagWriter(), id,
); err != nil {
return nil, false, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ created replica: (n1,s1):10
state engine:
(matches WAG node)
log engine:
Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): (r1/10:0,EventCreate)
Delete (Sized at 38): 0,0 /Local/RangeID/1/u/RaftHardState (0x016989757266746800):
Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): (r1/5:0,EventDestroy) (r1/10:0,EventCreate)
> Put: 0,0 /Local/RangeID/1/u/RangeTombstone (0x016989757266746200): next_replica_id:10
> Delete: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200):
> Put: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200): replica_id:10

# TODO(pav-kv): HardState of the previous replica should not be inherited.
# The old replica's HardState is cleared by the destroy.
print-range-state
----
range desc: r1:{a-d} [(n1,s1):5, (n2,s2):6, (n3,s3):7, next=8, gen=0]
replica (n1/s1): id=10 [uninitialized] HardState={Term:10,Vote:2,Commit:0}
replica (n1/s1): id=10 [uninitialized] HardState={Term:0,Vote:0,Commit:0}
Loading