From 4591f13620a9a9c28bc14f8e748e8c679cc61cbe Mon Sep 17 00:00:00 2001 From: DarrylWong Date: Mon, 4 May 2026 18:04:14 -0400 Subject: [PATCH 1/2] logical: extract Metrics into own package We want to start adding metrics to ldr txn mode, including the existing metrics. Txn mode is implemented as individual sub systems which will cause a dependency cycle if we attempt to import the original logical package. This change extracts the existing metrics struct to its own package. Release note: None --- pkg/crosscluster/logical/BUILD.bazel | 4 ++-- pkg/crosscluster/logical/logical_replication_job.go | 7 ++++--- .../logical/logical_replication_job_test.go | 3 ++- .../logical/logical_replication_writer_processor.go | 7 ++++--- pkg/crosscluster/logical/metrics/BUILD.bazel | 12 ++++++++++++ pkg/crosscluster/logical/{ => metrics}/metrics.go | 2 +- pkg/crosscluster/logical/resume_create_table.go | 3 ++- pkg/crosscluster/logical/resume_row.go | 3 ++- 8 files changed, 29 insertions(+), 12 deletions(-) create mode 100644 pkg/crosscluster/logical/metrics/BUILD.bazel rename pkg/crosscluster/logical/{ => metrics}/metrics.go (99%) diff --git a/pkg/crosscluster/logical/BUILD.bazel b/pkg/crosscluster/logical/BUILD.bazel index a2f31497dcdd..7c740c13da6c 100644 --- a/pkg/crosscluster/logical/BUILD.bazel +++ b/pkg/crosscluster/logical/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "logical_replication_writer_processor.go", "lww_kv_processor.go", "lww_row_processor.go", - "metrics.go", "offline_initial_scan_processor.go", "purgatory.go", "resume_create_table.go", @@ -35,6 +34,7 @@ go_library( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrsettings", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnapply", "//pkg/crosscluster/logical/txnmode", @@ -115,7 +115,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", - "@com_github_cockroachdb_crlib//crstrings", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", @@ -154,6 +153,7 @@ go_test( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrtestutils", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnwriter", "//pkg/crosscluster/replicationtestutils", diff --git a/pkg/crosscluster/logical/logical_replication_job.go b/pkg/crosscluster/logical/logical_replication_job.go index 970a5b115b50..dd19ad4a23c7 100644 --- a/pkg/crosscluster/logical/logical_replication_job.go +++ b/pkg/crosscluster/logical/logical_replication_job.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -262,7 +263,7 @@ func loadOnlineReplicatedTime( type rowHandler struct { replicatedTimeAtStart hlc.Timestamp frontier span.Frontier - metrics *Metrics + metrics *metrics.Metrics settings *settings.Values job *jobs.Job frontierUpdates chan hlc.Timestamp @@ -547,7 +548,7 @@ func geURIFromLoadedJobDetails(details jobspb.Details) string { } func init() { - m := MakeMetrics(base.DefaultHistogramWindowInterval()) + m := metrics.MakeMetrics(base.DefaultHistogramWindowInterval()) jobs.RegisterConstructor( jobspb.TypeLogicalReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { @@ -562,7 +563,7 @@ func init() { } }, jobs.WithJobMetrics(m), - jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds), + jobs.WithResolvedMetric(m.(*metrics.Metrics).ReplicatedTimeSeconds), jobs.UsesTenantCostControl, ) } diff --git a/pkg/crosscluster/logical/logical_replication_job_test.go b/pkg/crosscluster/logical/logical_replication_job_test.go index 8787fe6db8ab..e15fbec8d1be 100644 --- a/pkg/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/crosscluster/logical/logical_replication_job_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrtestutils" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient" @@ -1952,7 +1953,7 @@ func TestFlushErrorHandling(t *testing.T) { dlq := mockDLQ(0) lrw := &logicalReplicationWriterProcessor{ - metrics: MakeMetrics(0).(*Metrics), + metrics: metrics.MakeMetrics(0).(*metrics.Metrics), dlqClient: &dlq, } writerWorkers.Override(ctx, &serverCfg.Settings.SV, 1) diff --git a/pkg/crosscluster/logical/logical_replication_writer_processor.go b/pkg/crosscluster/logical/logical_replication_writer_processor.go index 1c421a26f968..52b512b98465 100644 --- a/pkg/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/crosscluster/logical/logical_replication_writer_processor.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/crosscluster" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -136,7 +137,7 @@ type logicalReplicationWriterProcessor struct { aggTimer timeutil.Timer // metrics are monitoring all running ingestion jobs. - metrics *Metrics + metrics *metrics.Metrics logBufferEvery log.EveryN @@ -230,7 +231,7 @@ func newLogicalReplicationWriterProcessor( ProcessorID: processorID, }, dlqClient: InitDeadLetterQueueClient(dlqDbExec, destTableBySrcID), - metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics), + metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics), seenEvery: log.Every(1 * time.Minute), retryEvery: log.Every(1 * time.Minute), pacer: kvbulk.NewCPUPacer(ctx, flowCtx.Cfg.DB.KV(), useLowPriority), @@ -294,7 +295,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { ctx = lrw.StartInternal(ctx, logicalReplicationWriterProcessorName, listeners...) - lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) db := lrw.FlowCtx.Cfg.DB diff --git a/pkg/crosscluster/logical/metrics/BUILD.bazel b/pkg/crosscluster/logical/metrics/BUILD.bazel new file mode 100644 index 000000000000..0fc94617e65c --- /dev/null +++ b/pkg/crosscluster/logical/metrics/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "metrics", + srcs = ["metrics.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/metric", + "@com_github_cockroachdb_crlib//crstrings", + ], +) diff --git a/pkg/crosscluster/logical/metrics.go b/pkg/crosscluster/logical/metrics/metrics.go similarity index 99% rename from pkg/crosscluster/logical/metrics.go rename to pkg/crosscluster/logical/metrics/metrics.go index 96a8cc6f5fa2..efe6d1825e84 100644 --- a/pkg/crosscluster/logical/metrics.go +++ b/pkg/crosscluster/logical/metrics/metrics.go @@ -3,7 +3,7 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package logical +package metrics import ( "time" diff --git a/pkg/crosscluster/logical/resume_create_table.go b/pkg/crosscluster/logical/resume_create_table.go index 7bad1ce44002..37fe7c8c44b6 100644 --- a/pkg/crosscluster/logical/resume_create_table.go +++ b/pkg/crosscluster/logical/resume_create_table.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/physical" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -143,7 +144,7 @@ func (r *logicalReplicationResumer) runOfflineInitialScan( } } - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, diff --git a/pkg/crosscluster/logical/resume_row.go b/pkg/crosscluster/logical/resume_row.go index 7b01e9d815d1..a29b030fd62e 100644 --- a/pkg/crosscluster/logical/resume_row.go +++ b/pkg/crosscluster/logical/resume_row.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/physical" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -120,7 +121,7 @@ func (r *logicalReplicationResumer) ingest( replanOracle, func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(execCfg.SV()) }, ) - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) // Store only the original plan diagram jobsprofiler.StorePlanDiagram(ctx, From 3918407cc06aa8ae0e2932eadf29cf56bf8a5ab0 Mon Sep 17 00:00:00 2001 From: DarrylWong Date: Mon, 4 May 2026 18:40:10 -0400 Subject: [PATCH 2/2] logical,txnapply: add in flight txn metrics This change adds metrics to see in flight transactions in the applier pipeline. The number is broken down into blocked and ready txns. The former represents the number of txns that are blocked on either another txn or the event horizon. The latter represents the number of txns that are ready to be commited but haven't yet, i.e. on the ready buffer. Both gauges live on the parent metrics.Metrics struct as TxnApplierBlockedTxns and TxnApplierReadyTxns. The applier takes a *metrics.Metrics pointer at construction and updates the gauges directly. Release note: None --- docs/generated/metrics/metrics.yaml | 18 +++ pkg/BUILD.bazel | 1 + pkg/crosscluster/logical/metrics/metrics.go | 23 ++++ pkg/crosscluster/logical/txnapply/BUILD.bazel | 3 + .../logical/txnapply/txn_applier.go | 19 ++- .../logical/txnapply/txn_applier_test.go | 126 +++++++++++++++++- pkg/crosscluster/logical/txnmode/BUILD.bazel | 1 + .../logical/txnmode/ldr_applier_processor.go | 4 + pkg/internal/metricscan/metric_owners.yaml | 2 + 9 files changed, 194 insertions(+), 3 deletions(-) diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index e1d93066b01e..f3476e8836de 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -7754,6 +7754,24 @@ layers: aggregation: AVG derivative: NONE owner: cockroachdb/cdc + - name: logical_replication.txn_applier.blocked_txns + exported_name: logical_replication_txn_applier_blocked_txns + description: Number of transactions the applier has received but not yet written, blocked on either a txn dependency or the event horizon + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.ready_txns + exported_name: logical_replication_txn_applier_ready_txns + description: Number of transactions that the applier has received and are ready to be committed or are being committed + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc - name: obs.clustermetrics.flush.count exported_name: obs_clustermetrics_flush_count description: Number of cluster metrics flush operations diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bc72c1a26892..ef714c8a1c3d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1474,6 +1474,7 @@ GO_TARGETS = [ "//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test", "//pkg/crosscluster/logical/ldrsettings:ldrsettings", "//pkg/crosscluster/logical/ldrtestutils:ldrtestutils", + "//pkg/crosscluster/logical/metrics:metrics", "//pkg/crosscluster/logical/sqlwriter:sqlwriter", "//pkg/crosscluster/logical/sqlwriter:sqlwriter_test", "//pkg/crosscluster/logical/txnapply:txnapply", diff --git a/pkg/crosscluster/logical/metrics/metrics.go b/pkg/crosscluster/logical/metrics/metrics.go index efe6d1825e84..af569c469040 100644 --- a/pkg/crosscluster/logical/metrics/metrics.go +++ b/pkg/crosscluster/logical/metrics/metrics.go @@ -224,6 +224,22 @@ var ( Measurement: "Ranges", Unit: metric.Unit_COUNT, } + + // Txn-mode applier metrics. + metaTxnApplierBlockedTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.blocked_txns", + Help: "Number of transactions the applier has received but not yet " + + "written, blocked on either a txn dependency or the event horizon", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + } + metaTxnApplierReadyTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.ready_txns", + Help: "Number of transactions that the applier has received and " + + "are ready to be committed or are being committed", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + } ) // Metrics are for production monitoring of logical replication jobs. @@ -268,6 +284,10 @@ type Metrics struct { LabeledEventsDLQed *metric.CounterVec LabeledScanningRanges *metric.GaugeVec LabeledCatchupRanges *metric.GaugeVec + + // Txn-mode applier metrics. The applier updates these directly. + TxnApplierBlockedTxns *metric.Gauge + TxnApplierReadyTxns *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -317,5 +337,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { LabeledEventsDLQed: metric.NewExportedCounterVec(metaLabeledEventsDLQed, []string{"label"}), LabeledScanningRanges: metric.NewExportedGaugeVec(metaLabeledScanningRanges, []string{"label"}), LabeledCatchupRanges: metric.NewExportedGaugeVec(metaLabeledCatchupRanges, []string{"label"}), + + TxnApplierBlockedTxns: metric.NewGauge(metaTxnApplierBlockedTxns), + TxnApplierReadyTxns: metric.NewGauge(metaTxnApplierReadyTxns), } } diff --git a/pkg/crosscluster/logical/txnapply/BUILD.bazel b/pkg/crosscluster/logical/txnapply/BUILD.bazel index ebb0d3fbb299..ae73185922d9 100644 --- a/pkg/crosscluster/logical/txnapply/BUILD.bazel +++ b/pkg/crosscluster/logical/txnapply/BUILD.bazel @@ -14,6 +14,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/txnpb", "//pkg/crosscluster/logical/txnwriter", "//pkg/settings", @@ -39,6 +40,7 @@ go_test( embed = [":txnapply"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/txnwriter", "//pkg/settings/cluster", "//pkg/util/admission", @@ -47,6 +49,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/randutil", + "//pkg/util/ring", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/crosscluster/logical/txnapply/txn_applier.go b/pkg/crosscluster/logical/txnapply/txn_applier.go index 0b9c25956387..4ed15ec63998 100644 --- a/pkg/crosscluster/logical/txnapply/txn_applier.go +++ b/pkg/crosscluster/logical/txnapply/txn_applier.go @@ -11,6 +11,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -108,6 +109,7 @@ type Applier struct { id ldrdecoder.ApplierID settings *cluster.Settings depResolver DependencyResolverClient + metrics *metrics.Metrics mu struct { syncutil.Mutex @@ -164,6 +166,7 @@ func NewApplier( depResolver DependencyResolverClient, allApplierIDs []ldrdecoder.ApplierID, newCPUHandle func() *admission.SQLCPUHandle, + metrics *metrics.Metrics, ) (_ *Applier, retErr error) { defer func() { if retErr != nil { @@ -181,10 +184,14 @@ func NewApplier( if newCPUHandle == nil { return nil, errors.AssertionFailedf("newCPUHandle must not be nil") } + if metrics == nil { + return nil, errors.New("metrics must not be nil") + } a := &Applier{ id: id, settings: settings, depResolver: depResolver, + metrics: metrics, txnWriters: writers, newCPUHandle: newCPUHandle, localResolvedTime: MakeLatest[hlc.Timestamp](), @@ -341,6 +348,7 @@ func (a *Applier) recordTransaction(transaction ScheduledTransaction) (bool, err if transaction.remainingDeps == 0 { if transaction.EventHorizon.LessEq(a.getGlobalFrontierLocked()) { + a.metrics.TxnApplierReadyTxns.Inc(1) return true, nil } heap.Push(&a.mu.horizonWaiting, horizonWaiter{ @@ -349,6 +357,7 @@ func (a *Applier) recordTransaction(transaction ScheduledTransaction) (bool, err }) a.registerHorizonWaitLocked(transaction.EventHorizon) } + a.metrics.TxnApplierBlockedTxns.Inc(1) return false, nil } @@ -500,7 +509,11 @@ func (a *Applier) recordCompletion( delete(a.mu.localWaiting, completedID) a.mu.committed.Resolve(completedID) - delete(a.mu.transactions, completedID) + // Don't count synthetic transactions. + if _, ok := a.mu.transactions[completedID]; ok { + delete(a.mu.transactions, completedID) + a.metrics.TxnApplierReadyTxns.Dec(1) + } // Advance the resolved time by draining applied txns from the front // of the ordered txnIDs buffer. @@ -554,6 +567,8 @@ func (a *Applier) resolveDependencyLocked( if waitingTxn.remainingDeps == 0 { if waitingTxn.EventHorizon.LessEq(a.getGlobalFrontierLocked()) { readyBuffer.AddLast(waitingTxn.Transaction) + a.metrics.TxnApplierBlockedTxns.Dec(1) + a.metrics.TxnApplierReadyTxns.Inc(1) } else { heap.Push(&a.mu.horizonWaiting, horizonWaiter{ txnID: waitingID, @@ -610,6 +625,8 @@ func (a *Applier) drainSatisfiedHorizonWaitersLocked( heap.Pop(&a.mu.horizonWaiting) txn := a.mu.transactions[top.txnID] readyBuffer.AddLast(txn.Transaction) + a.metrics.TxnApplierBlockedTxns.Dec(1) + a.metrics.TxnApplierReadyTxns.Inc(1) } } diff --git a/pkg/crosscluster/logical/txnapply/txn_applier_test.go b/pkg/crosscluster/logical/txnapply/txn_applier_test.go index 51c0e2fcaafe..406bbcc81bff 100644 --- a/pkg/crosscluster/logical/txnapply/txn_applier_test.go +++ b/pkg/crosscluster/logical/txnapply/txn_applier_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -388,6 +390,126 @@ func TestTxnApplierIndependent(t *testing.T) { } } +// TestWaitingTxnMetrics tests that the BlockedTxns and ReadyTxns metrics +// are working as intended. +func TestWaitingTxnMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + const localID ldrdecoder.ApplierID = 1 + allIDs := []ldrdecoder.ApplierID{localID} + + st := cluster.MakeTestingClusterSettings() + newApplier := func(t *testing.T) (*Applier, *metrics.Metrics) { + m := metrics.MakeMetrics(0).(*metrics.Metrics) + writers := []txnwriter.TransactionWriter{&benchWriter{}} + depCtx, cancel := context.WithCancel(ctx) + depTracker, depTrackerCleanup := NewTestDependencyTrackerClient(depCtx, allIDs) + a, err := NewApplier(ctx, localID, st, writers, + depTracker, allIDs, testNewCPUHandle, m) + require.NoError(t, err) + t.Cleanup(func() { + a.Close(ctx) + cancel() + _ = depTrackerCleanup() + }) + return a, m + } + + newTxn := func(t *testing.T, a *Applier, ts, eventHorizon int64, deps ...ldrdecoder.TxnID) (bool, ldrdecoder.TxnID) { + id := ldrdecoder.TxnID{ApplierID: localID, Timestamp: hlc.Timestamp{WallTime: ts}} + appliable, err := a.recordTransaction(ScheduledTransaction{ + Transaction: ldrdecoder.Transaction{TxnID: id}, + Dependencies: deps, + EventHorizon: hlc.Timestamp{WallTime: eventHorizon}, + }) + require.NoError(t, err) + return appliable, id + } + + resolveDep := func(t *testing.T, a *Applier, completedID ldrdecoder.TxnID) { + a.mu.Lock() + defer a.mu.Unlock() + err := a.resolveDependencyLocked(completedID, a.mu.localWaiting[completedID], + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + } + + t.Run("basic", func(t *testing.T) { + a, m := newApplier(t) + + // Add a txn with no dependencies, should immediately be placed on the + // ready buffer. + appliable, txn1 := newTxn(t, a, 1, 0) + require.True(t, appliable) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 1, m.TxnApplierReadyTxns.Value()) + + // Add a txn that's blocked on the first txn. + appliable, txn2 := newTxn(t, a, 2, 0, txn1) + require.False(t, appliable) + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 1, m.TxnApplierReadyTxns.Value()) + + // Mark the dependency as resolved for the first txn, which should mark the second txn as ready. + resolveDep(t, a, txn1) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 2, m.TxnApplierReadyTxns.Value()) + + // Complete both txns, gauges should drop to 0. + for _, id := range []ldrdecoder.TxnID{txn1, txn2} { + err := a.recordCompletion(ctx, + appliedTransaction{Transaction: ldrdecoder.Transaction{TxnID: id}}, + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + } + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 0, m.TxnApplierReadyTxns.Value()) + }) + + // Test that a txn that's blocked on multiple dependencies isn't + // marked as ready until we've resolved all dependencies. + t.Run("multiple dependencies", func(t *testing.T) { + a, m := newApplier(t) + _, dep1 := newTxn(t, a, 1, 0) + _, dep2 := newTxn(t, a, 2, 0) + _, dep3 := newTxn(t, a, 3, 0) + + _, _ = newTxn(t, a, 4, 0, dep1, dep2, dep3) + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + + // Resolve two of three: blocked stays at 1, ready unchanged. + resolveDep(t, a, dep1) + resolveDep(t, a, dep2) + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 3, m.TxnApplierReadyTxns.Value()) + + // Resolve the last dep, everything should be ready. + resolveDep(t, a, dep3) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 4, m.TxnApplierReadyTxns.Value()) + }) + + t.Run("horizon blocked", func(t *testing.T) { + a, m := newApplier(t) + // Add a txn that's blocked on the Event Horizon to see its + // correctly marked as blocked. + appliable, _ := newTxn(t, a, 0, 10) + require.False(t, appliable, "horizon blocked txn should not be appliable") + require.EqualValues(t, 1, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 0, m.TxnApplierReadyTxns.Value()) + + // Advance the frontier so we are no longer blocked on the event horizon. + completedTxn, ok := a.processCheckpoint(hlc.Timestamp{WallTime: 11}) + require.True(t, ok) + err := a.recordCompletion(ctx, completedTxn, + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + require.EqualValues(t, 0, m.TxnApplierBlockedTxns.Value()) + require.EqualValues(t, 1, m.TxnApplierReadyTxns.Value()) + }) +} + // mockCoordinator simulates the coordinator's checkpoint behavior in tests. // It sends a checkpoint at txn.timestamp-1 to all appliers when a txn's // EventHorizon exceeds the previous checkpoint (required for correctness), @@ -491,7 +613,7 @@ func runDistributedApplier( for i := range writers { writers[i] = sharedWriter } - a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle) + a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle, metrics.MakeMetrics(0).(*metrics.Metrics)) require.NoError(t, err) inputs[id] = make(chan ApplierEvent, 2*len(dag)+len(ids)+1) @@ -649,7 +771,7 @@ func runBenchApplier(b *testing.B, dag []txnNode, numWritersPerApplier int, rngS for i := range writers { writers[i] = sharedWriter } - a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle) + a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle, metrics.MakeMetrics(0).(*metrics.Metrics)) require.NoError(b, err) inputs[id] = make(chan ApplierEvent, 2*len(dag)+len(ids)+1) appliers[id] = a diff --git a/pkg/crosscluster/logical/txnmode/BUILD.bazel b/pkg/crosscluster/logical/txnmode/BUILD.bazel index 9be6907f8334..ca6dd28d1440 100644 --- a/pkg/crosscluster/logical/txnmode/BUILD.bazel +++ b/pkg/crosscluster/logical/txnmode/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrsettings", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnapply", "//pkg/crosscluster/logical/txnfeed", diff --git a/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go b/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go index 1f5e38d8f8eb..7a07032d73bb 100644 --- a/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go +++ b/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go @@ -10,9 +10,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnapply" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnpb" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -151,6 +153,7 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { writers = append(writers, writer) } + m := p.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) p.applier, err = txnapply.NewApplier( ctx, applierID, p.FlowCtx.Cfg.Settings, writers, p.depResolver, p.spec.AllApplierIds, func() *admission.SQLCPUHandle { @@ -160,6 +163,7 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { CreateTime: timeutil.Now().UnixNano(), }, false /* atGateway */) }, + m, ) if err != nil { return errors.Wrap(err, "creating applier") diff --git a/pkg/internal/metricscan/metric_owners.yaml b/pkg/internal/metricscan/metric_owners.yaml index 697700e22b42..45a43b83d2d6 100644 --- a/pkg/internal/metricscan/metric_owners.yaml +++ b/pkg/internal/metricscan/metric_owners.yaml @@ -474,6 +474,8 @@ owners: logical_replication_retry_queue_events: cockroachdb/cdc logical_replication_scanning_ranges: cockroachdb/cdc logical_replication_scanning_ranges_by_label: cockroachdb/cdc + logical_replication_txn_applier_blocked_txns: cockroachdb/cdc + logical_replication_txn_applier_ready_txns: cockroachdb/cdc mma_change: cockroachdb/kv mma_change_external_lease_failure: cockroachdb/kv mma_change_external_lease_success: cockroachdb/kv