diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index e1d93066b01e..f3476e8836de 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -7754,6 +7754,24 @@ layers: aggregation: AVG derivative: NONE owner: cockroachdb/cdc + - name: logical_replication.txn_applier.blocked_txns + exported_name: logical_replication_txn_applier_blocked_txns + description: Number of transactions the applier has received but not yet written, blocked on either a txn dependency or the event horizon + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.ready_txns + exported_name: logical_replication_txn_applier_ready_txns + description: Number of transactions that the applier has received and are ready to be committed or are being committed + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc - name: obs.clustermetrics.flush.count exported_name: obs_clustermetrics_flush_count description: Number of cluster metrics flush operations diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bc72c1a26892..ef714c8a1c3d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1474,6 +1474,7 @@ GO_TARGETS = [ "//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test", "//pkg/crosscluster/logical/ldrsettings:ldrsettings", "//pkg/crosscluster/logical/ldrtestutils:ldrtestutils", + "//pkg/crosscluster/logical/metrics:metrics", "//pkg/crosscluster/logical/sqlwriter:sqlwriter", "//pkg/crosscluster/logical/sqlwriter:sqlwriter_test", "//pkg/crosscluster/logical/txnapply:txnapply", diff --git a/pkg/crosscluster/logical/BUILD.bazel b/pkg/crosscluster/logical/BUILD.bazel index a2f31497dcdd..7c740c13da6c 100644 --- a/pkg/crosscluster/logical/BUILD.bazel +++ b/pkg/crosscluster/logical/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "logical_replication_writer_processor.go", "lww_kv_processor.go", "lww_row_processor.go", - "metrics.go", "offline_initial_scan_processor.go", "purgatory.go", "resume_create_table.go", @@ -35,6 +34,7 @@ go_library( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrsettings", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnapply", "//pkg/crosscluster/logical/txnmode", @@ -115,7 +115,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", - "@com_github_cockroachdb_crlib//crstrings", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", @@ -154,6 +153,7 @@ go_test( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrtestutils", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnwriter", "//pkg/crosscluster/replicationtestutils", diff --git a/pkg/crosscluster/logical/logical_replication_job.go b/pkg/crosscluster/logical/logical_replication_job.go index 970a5b115b50..dd19ad4a23c7 100644 --- a/pkg/crosscluster/logical/logical_replication_job.go +++ b/pkg/crosscluster/logical/logical_replication_job.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -262,7 +263,7 @@ func loadOnlineReplicatedTime( type rowHandler struct { replicatedTimeAtStart hlc.Timestamp frontier span.Frontier - metrics *Metrics + metrics *metrics.Metrics settings *settings.Values job *jobs.Job frontierUpdates chan hlc.Timestamp @@ -547,7 +548,7 @@ func geURIFromLoadedJobDetails(details jobspb.Details) string { } func init() { - m := MakeMetrics(base.DefaultHistogramWindowInterval()) + m := metrics.MakeMetrics(base.DefaultHistogramWindowInterval()) jobs.RegisterConstructor( jobspb.TypeLogicalReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { @@ -562,7 +563,7 @@ func init() { } }, jobs.WithJobMetrics(m), - jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds), + jobs.WithResolvedMetric(m.(*metrics.Metrics).ReplicatedTimeSeconds), jobs.UsesTenantCostControl, ) } diff --git a/pkg/crosscluster/logical/logical_replication_job_test.go b/pkg/crosscluster/logical/logical_replication_job_test.go index 8787fe6db8ab..e15fbec8d1be 100644 --- a/pkg/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/crosscluster/logical/logical_replication_job_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrtestutils" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient" @@ -1952,7 +1953,7 @@ func TestFlushErrorHandling(t *testing.T) { dlq := mockDLQ(0) lrw := &logicalReplicationWriterProcessor{ - metrics: MakeMetrics(0).(*Metrics), + metrics: metrics.MakeMetrics(0).(*metrics.Metrics), dlqClient: &dlq, } writerWorkers.Override(ctx, &serverCfg.Settings.SV, 1) diff --git a/pkg/crosscluster/logical/logical_replication_writer_processor.go b/pkg/crosscluster/logical/logical_replication_writer_processor.go index 1c421a26f968..52b512b98465 100644 --- a/pkg/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/crosscluster/logical/logical_replication_writer_processor.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/crosscluster" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -136,7 +137,7 @@ type logicalReplicationWriterProcessor struct { aggTimer timeutil.Timer // metrics are monitoring all running ingestion jobs. - metrics *Metrics + metrics *metrics.Metrics logBufferEvery log.EveryN @@ -230,7 +231,7 @@ func newLogicalReplicationWriterProcessor( ProcessorID: processorID, }, dlqClient: InitDeadLetterQueueClient(dlqDbExec, destTableBySrcID), - metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics), + metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics), seenEvery: log.Every(1 * time.Minute), retryEvery: log.Every(1 * time.Minute), pacer: kvbulk.NewCPUPacer(ctx, flowCtx.Cfg.DB.KV(), useLowPriority), @@ -294,7 +295,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { ctx = lrw.StartInternal(ctx, logicalReplicationWriterProcessorName, listeners...) - lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) db := lrw.FlowCtx.Cfg.DB diff --git a/pkg/crosscluster/logical/metrics/BUILD.bazel b/pkg/crosscluster/logical/metrics/BUILD.bazel new file mode 100644 index 000000000000..0fc94617e65c --- /dev/null +++ b/pkg/crosscluster/logical/metrics/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "metrics", + srcs = ["metrics.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/metric", + "@com_github_cockroachdb_crlib//crstrings", + ], +) diff --git a/pkg/crosscluster/logical/metrics.go b/pkg/crosscluster/logical/metrics/metrics.go similarity index 93% rename from pkg/crosscluster/logical/metrics.go rename to pkg/crosscluster/logical/metrics/metrics.go index 96a8cc6f5fa2..af569c469040 100644 --- a/pkg/crosscluster/logical/metrics.go +++ b/pkg/crosscluster/logical/metrics/metrics.go @@ -3,7 +3,7 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package logical +package metrics import ( "time" @@ -224,6 +224,22 @@ var ( Measurement: "Ranges", Unit: metric.Unit_COUNT, } + + // Txn-mode applier metrics. + metaTxnApplierBlockedTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.blocked_txns", + Help: "Number of transactions the applier has received but not yet " + + "written, blocked on either a txn dependency or the event horizon", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + } + metaTxnApplierReadyTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.ready_txns", + Help: "Number of transactions that the applier has received and " + + "are ready to be committed or are being committed", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + } ) // Metrics are for production monitoring of logical replication jobs. @@ -268,6 +284,10 @@ type Metrics struct { LabeledEventsDLQed *metric.CounterVec LabeledScanningRanges *metric.GaugeVec LabeledCatchupRanges *metric.GaugeVec + + // Txn-mode applier metrics. The applier updates these directly. + TxnApplierBlockedTxns *metric.Gauge + TxnApplierReadyTxns *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -317,5 +337,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { LabeledEventsDLQed: metric.NewExportedCounterVec(metaLabeledEventsDLQed, []string{"label"}), LabeledScanningRanges: metric.NewExportedGaugeVec(metaLabeledScanningRanges, []string{"label"}), LabeledCatchupRanges: metric.NewExportedGaugeVec(metaLabeledCatchupRanges, []string{"label"}), + + TxnApplierBlockedTxns: metric.NewGauge(metaTxnApplierBlockedTxns), + TxnApplierReadyTxns: metric.NewGauge(metaTxnApplierReadyTxns), } } diff --git a/pkg/crosscluster/logical/resume_create_table.go b/pkg/crosscluster/logical/resume_create_table.go index 7bad1ce44002..37fe7c8c44b6 100644 --- a/pkg/crosscluster/logical/resume_create_table.go +++ b/pkg/crosscluster/logical/resume_create_table.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/physical" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -143,7 +144,7 @@ func (r *logicalReplicationResumer) runOfflineInitialScan( } } - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, diff --git a/pkg/crosscluster/logical/resume_row.go b/pkg/crosscluster/logical/resume_row.go index 7b01e9d815d1..a29b030fd62e 100644 --- a/pkg/crosscluster/logical/resume_row.go +++ b/pkg/crosscluster/logical/resume_row.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/physical" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -120,7 +121,7 @@ func (r *logicalReplicationResumer) ingest( replanOracle, func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(execCfg.SV()) }, ) - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) // Store only the original plan diagram jobsprofiler.StorePlanDiagram(ctx, diff --git a/pkg/crosscluster/logical/txnapply/BUILD.bazel b/pkg/crosscluster/logical/txnapply/BUILD.bazel index ebb0d3fbb299..ae73185922d9 100644 --- a/pkg/crosscluster/logical/txnapply/BUILD.bazel +++ b/pkg/crosscluster/logical/txnapply/BUILD.bazel @@ -14,6 +14,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/txnpb", "//pkg/crosscluster/logical/txnwriter", "//pkg/settings", @@ -39,6 +40,7 @@ go_test( embed = [":txnapply"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/txnwriter", "//pkg/settings/cluster", "//pkg/util/admission", @@ -47,6 +49,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/randutil", + "//pkg/util/ring", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/crosscluster/logical/txnapply/txn_applier.go b/pkg/crosscluster/logical/txnapply/txn_applier.go index 0b9c25956387..4ed15ec63998 100644 --- a/pkg/crosscluster/logical/txnapply/txn_applier.go +++ b/pkg/crosscluster/logical/txnapply/txn_applier.go @@ -11,6 +11,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -108,6 +109,7 @@ type Applier struct { id ldrdecoder.ApplierID settings *cluster.Settings depResolver DependencyResolverClient + metrics *metrics.Metrics mu struct { syncutil.Mutex @@ -164,6 +166,7 @@ func NewApplier( depResolver DependencyResolverClient, allApplierIDs []ldrdecoder.ApplierID, newCPUHandle func() *admission.SQLCPUHandle, + metrics *metrics.Metrics, ) (_ *Applier, retErr error) { defer func() { if retErr != nil { @@ -181,10 +184,14 @@ func NewApplier( if newCPUHandle == nil { return nil, errors.AssertionFailedf("newCPUHandle must not be nil") } + if metrics == nil { + return nil, errors.New("metrics must not be nil") + } a := &Applier{ id: id, settings: settings, depResolver: depResolver, + metrics: metrics, txnWriters: writers, newCPUHandle: newCPUHandle, localResolvedTime: MakeLatest[hlc.Timestamp](), @@ -341,6 +348,7 @@ func (a *Applier) recordTransaction(transaction ScheduledTransaction) (bool, err if transaction.remainingDeps == 0 { if transaction.EventHorizon.LessEq(a.getGlobalFrontierLocked()) { + a.metrics.TxnApplierReadyTxns.Inc(1) return true, nil } heap.Push(&a.mu.horizonWaiting, horizonWaiter{ @@ -349,6 +357,7 @@ func (a *Applier) recordTransaction(transaction ScheduledTransaction) (bool, err }) a.registerHorizonWaitLocked(transaction.EventHorizon) } + a.metrics.TxnApplierBlockedTxns.Inc(1) return false, nil } @@ -500,7 +509,11 @@ func (a *Applier) recordCompletion( delete(a.mu.localWaiting, completedID) a.mu.committed.Resolve(completedID) - delete(a.mu.transactions, completedID) + // Don't count synthetic transactions. + if _, ok := a.mu.transactions[completedID]; ok { + delete(a.mu.transactions, completedID) + a.metrics.TxnApplierReadyTxns.Dec(1) + } // Advance the resolved time by draining applied txns from the front // of the ordered txnIDs buffer. @@ -554,6 +567,8 @@ func (a *Applier) resolveDependencyLocked( if waitingTxn.remainingDeps == 0 { if waitingTxn.EventHorizon.LessEq(a.getGlobalFrontierLocked()) { readyBuffer.AddLast(waitingTxn.Transaction) + a.metrics.TxnApplierBlockedTxns.Dec(1) + a.metrics.TxnApplierReadyTxns.Inc(1) } else { heap.Push(&a.mu.horizonWaiting, horizonWaiter{ txnID: waitingID, @@ -610,6 +625,8 @@ func (a *Applier) drainSatisfiedHorizonWaitersLocked( heap.Pop(&a.mu.horizonWaiting) txn := a.mu.transactions[top.txnID] readyBuffer.AddLast(txn.Transaction) + a.metrics.TxnApplierBlockedTxns.Dec(1) + a.metrics.TxnApplierReadyTxns.Inc(1) } } diff --git a/pkg/crosscluster/logical/txnapply/txn_applier_test.go b/pkg/crosscluster/logical/txnapply/txn_applier_test.go index 51c0e2fcaafe..406bbcc81bff 100644 --- a/pkg/crosscluster/logical/txnapply/txn_applier_test.go +++ b/pkg/crosscluster/logical/txnapply/txn_applier_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -388,6 +390,126 @@ func TestTxnApplierIndependent(t *testing.T) { } } +// TestWaitingTxnMetrics tests that the BlockedTxns and ReadyTxns metrics +// are working as intended. +func TestWaitingTxnMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + const localID ldrdecoder.ApplierID = 1 + allIDs := []ldrdecoder.ApplierID{localID} + + st := cluster.MakeTestingClusterSettings() + newApplier := func(t *testing.T) (*Applier, *metrics.Metrics) { + m := metrics.MakeMetrics(0).(*metrics.Metrics) + writers := []txnwriter.TransactionWriter{&benchWriter{}} + depCtx, cancel := context.WithCancel(ctx) + depTracker, depTrackerCleanup := NewTestDependencyTrackerClient(depCtx, allIDs) + a, err := NewApplier(ctx, localID, st, writers, + depTracker, allIDs, testNewCPUHandle, m) + require.NoError(t, err) + t.Cleanup(func() { + a.Close(ctx) + cancel() + _ = depTrackerCleanup() + }) + return a, m + } + + newTxn := func(t *testing.T, a *Applier, ts, eventHorizon int64, deps ...ldrdecoder.TxnID) (bool, ldrdecoder.TxnID) { + id := ldrdecoder.TxnID{ApplierID: localID, Timestamp: hlc.Timestamp{WallTime: ts}} + appliable, err := a.recordTransaction(ScheduledTransaction{ + Transaction: ldrdecoder.Transaction{TxnID: id}, + Dependencies: deps, + EventHorizon: hlc.Timestamp{WallTime: eventHorizon}, + }) + require.NoError(t, err) + return appliable, id + } + + resolveDep := func(t *testing.T, a *Applier, completedID ldrdecoder.TxnID) { + a.mu.Lock() + defer a.mu.Unlock() + err := a.resolveDependencyLocked(completedID, a.mu.localWaiting[completedID], + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + } + + t.Run("basic", func(t *testing.T) { + a, m := newApplier(t) + + // Add a txn with no dependencies, should immediately be placed on the + // ready buffer. + appliable, txn1 := newTxn(t, a, 1, 0) + require.True(t, appliable) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 1, m.TxnApplierReadyTxns.Value()) + + // Add a txn that's blocked on the first txn. + appliable, txn2 := newTxn(t, a, 2, 0, txn1) + require.False(t, appliable) + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 1, m.TxnApplierReadyTxns.Value()) + + // Mark the dependency as resolved for the first txn, which should mark the second txn as ready. + resolveDep(t, a, txn1) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 2, m.TxnApplierReadyTxns.Value()) + + // Complete both txns, gauges should drop to 0. + for _, id := range []ldrdecoder.TxnID{txn1, txn2} { + err := a.recordCompletion(ctx, + appliedTransaction{Transaction: ldrdecoder.Transaction{TxnID: id}}, + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + } + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 0, m.TxnApplierReadyTxns.Value()) + }) + + // Test that a txn that's blocked on multiple dependencies isn't + // marked as ready until we've resolved all dependencies. + t.Run("multiple dependencies", func(t *testing.T) { + a, m := newApplier(t) + _, dep1 := newTxn(t, a, 1, 0) + _, dep2 := newTxn(t, a, 2, 0) + _, dep3 := newTxn(t, a, 3, 0) + + _, _ = newTxn(t, a, 4, 0, dep1, dep2, dep3) + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + + // Resolve two of three: blocked stays at 1, ready unchanged. + resolveDep(t, a, dep1) + resolveDep(t, a, dep2) + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 3, m.TxnApplierReadyTxns.Value()) + + // Resolve the last dep, everything should be ready. + resolveDep(t, a, dep3) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 4, m.TxnApplierReadyTxns.Value()) + }) + + t.Run("horizon blocked", func(t *testing.T) { + a, m := newApplier(t) + // Add a txn that's blocked on the Event Horizon to see its + // correctly marked as blocked. + appliable, _ := newTxn(t, a, 0, 10) + require.False(t, appliable, "horizon blocked txn should not be appliable") + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 0, m.TxnApplierReadyTxns.Value()) + + // Advance the frontier so we are no longer blocked on the event horizon. + completedTxn, ok := a.processCheckpoint(hlc.Timestamp{WallTime: 11}) + require.True(t, ok) + err := a.recordCompletion(ctx, completedTxn, + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 1, m.TxnApplierReadyTxns.Value()) + }) +} + // mockCoordinator simulates the coordinator's checkpoint behavior in tests. // It sends a checkpoint at txn.timestamp-1 to all appliers when a txn's // EventHorizon exceeds the previous checkpoint (required for correctness), @@ -491,7 +613,7 @@ func runDistributedApplier( for i := range writers { writers[i] = sharedWriter } - a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle) + a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle, metrics.MakeMetrics(0).(*metrics.Metrics)) require.NoError(t, err) inputs[id] = make(chan ApplierEvent, 2*len(dag)+len(ids)+1) @@ -649,7 +771,7 @@ func runBenchApplier(b *testing.B, dag []txnNode, numWritersPerApplier int, rngS for i := range writers { writers[i] = sharedWriter } - a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle) + a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle, metrics.MakeMetrics(0).(*metrics.Metrics)) require.NoError(b, err) inputs[id] = make(chan ApplierEvent, 2*len(dag)+len(ids)+1) appliers[id] = a diff --git a/pkg/crosscluster/logical/txnmode/BUILD.bazel b/pkg/crosscluster/logical/txnmode/BUILD.bazel index 9be6907f8334..ca6dd28d1440 100644 --- a/pkg/crosscluster/logical/txnmode/BUILD.bazel +++ b/pkg/crosscluster/logical/txnmode/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrsettings", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnapply", "//pkg/crosscluster/logical/txnfeed", diff --git a/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go b/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go index 1f5e38d8f8eb..7a07032d73bb 100644 --- a/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go +++ b/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go @@ -10,9 +10,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnapply" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnpb" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -151,6 +153,7 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { writers = append(writers, writer) } + m := p.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) p.applier, err = txnapply.NewApplier( ctx, applierID, p.FlowCtx.Cfg.Settings, writers, p.depResolver, p.spec.AllApplierIds, func() *admission.SQLCPUHandle { @@ -160,6 +163,7 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { CreateTime: timeutil.Now().UnixNano(), }, false /* atGateway */) }, + m, ) if err != nil { return errors.Wrap(err, "creating applier") diff --git a/pkg/internal/metricscan/metric_owners.yaml b/pkg/internal/metricscan/metric_owners.yaml index 697700e22b42..45a43b83d2d6 100644 --- a/pkg/internal/metricscan/metric_owners.yaml +++ b/pkg/internal/metricscan/metric_owners.yaml @@ -474,6 +474,8 @@ owners: logical_replication_retry_queue_events: cockroachdb/cdc logical_replication_scanning_ranges: cockroachdb/cdc logical_replication_scanning_ranges_by_label: cockroachdb/cdc + logical_replication_txn_applier_blocked_txns: cockroachdb/cdc + logical_replication_txn_applier_ready_txns: cockroachdb/cdc mma_change: cockroachdb/kv mma_change_external_lease_failure: cockroachdb/kv mma_change_external_lease_success: cockroachdb/kv