diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index e1d93066b01e..2c379e295a7c 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -7754,6 +7754,66 @@ layers: aggregation: AVG derivative: NONE owner: cockroachdb/cdc + - name: logical_replication.txn_applier.horizon_wait_txns + exported_name: logical_replication_txn_applier_horizon_wait_txns + labeled_name: 'logical_replication.txn_applier.in_flight_txns{type: horizon_wait}' + description: Number of in-flight transactions waiting for the global frontier to advance past their event horizon + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.horizon_wait_txns_by_label + exported_name: logical_replication_txn_applier_horizon_wait_txns_by_label + labeled_name: 'logical_replication.txn_applier.in_flight_txns_by_label{type: horizon_wait}' + description: Number of in-flight transactions waiting for the global frontier to advance past their event horizon, by job scope + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.ready_txns + exported_name: logical_replication_txn_applier_ready_txns + labeled_name: 'logical_replication.txn_applier.in_flight_txns{type: ready}' + description: Number of in-flight transactions that are ready to be applied or currently being applied + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.ready_txns_by_label + exported_name: logical_replication_txn_applier_ready_txns_by_label + labeled_name: 'logical_replication.txn_applier.in_flight_txns_by_label{type: ready}' + description: Number of in-flight transactions that are ready to be applied or currently being applied, by job scope + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.txn_wait_txns + exported_name: logical_replication_txn_applier_txn_wait_txns + labeled_name: 'logical_replication.txn_applier.in_flight_txns{type: txn_wait}' + description: Number of in-flight transactions blocked on a peer transaction's completion + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc + - name: logical_replication.txn_applier.txn_wait_txns_by_label + exported_name: logical_replication_txn_applier_txn_wait_txns_by_label + labeled_name: 'logical_replication.txn_applier.in_flight_txns_by_label{type: txn_wait}' + description: Number of in-flight transactions blocked on a peer transaction's completion, by job scope + y_axis_label: Transactions + type: GAUGE + unit: COUNT + aggregation: AVG + derivative: NONE + owner: cockroachdb/cdc - name: obs.clustermetrics.flush.count exported_name: obs_clustermetrics_flush_count description: Number of cluster metrics flush operations diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bc72c1a26892..ef714c8a1c3d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1474,6 +1474,7 @@ GO_TARGETS = [ "//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test", "//pkg/crosscluster/logical/ldrsettings:ldrsettings", "//pkg/crosscluster/logical/ldrtestutils:ldrtestutils", + "//pkg/crosscluster/logical/metrics:metrics", "//pkg/crosscluster/logical/sqlwriter:sqlwriter", "//pkg/crosscluster/logical/sqlwriter:sqlwriter_test", "//pkg/crosscluster/logical/txnapply:txnapply", diff --git a/pkg/crosscluster/logical/BUILD.bazel b/pkg/crosscluster/logical/BUILD.bazel index a2f31497dcdd..7c740c13da6c 100644 --- a/pkg/crosscluster/logical/BUILD.bazel +++ b/pkg/crosscluster/logical/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "logical_replication_writer_processor.go", "lww_kv_processor.go", "lww_row_processor.go", - "metrics.go", "offline_initial_scan_processor.go", "purgatory.go", "resume_create_table.go", @@ -35,6 +34,7 @@ go_library( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrsettings", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnapply", "//pkg/crosscluster/logical/txnmode", @@ -115,7 +115,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", - "@com_github_cockroachdb_crlib//crstrings", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", @@ -154,6 +153,7 @@ go_test( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrtestutils", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnwriter", "//pkg/crosscluster/replicationtestutils", diff --git a/pkg/crosscluster/logical/ldrdecoder/txn_decoder.go b/pkg/crosscluster/logical/ldrdecoder/txn_decoder.go index ab13aa91af0b..581f0806cfed 100644 --- a/pkg/crosscluster/logical/ldrdecoder/txn_decoder.go +++ b/pkg/crosscluster/logical/ldrdecoder/txn_decoder.go @@ -27,6 +27,7 @@ type TxnID = txnpb.LDRTxnID type Transaction struct { TxnID TxnID WriteSet []DecodedRow + Bytes int64 } // TxnEnvelope pairs a decoded Transaction with the raw KV events it was @@ -73,6 +74,7 @@ func (t *TxnDecoder) DecodeTxn( decoded.RowTimestamp, result.TxnID.Timestamp) } result.WriteSet = append(result.WriteSet, decoded) + result.Bytes += int64(event.Size()) } return result, nil diff --git a/pkg/crosscluster/logical/logical_replication_job.go b/pkg/crosscluster/logical/logical_replication_job.go index 970a5b115b50..dd19ad4a23c7 100644 --- a/pkg/crosscluster/logical/logical_replication_job.go +++ b/pkg/crosscluster/logical/logical_replication_job.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -262,7 +263,7 @@ func loadOnlineReplicatedTime( type rowHandler struct { replicatedTimeAtStart hlc.Timestamp frontier span.Frontier - metrics *Metrics + metrics *metrics.Metrics settings *settings.Values job *jobs.Job frontierUpdates chan hlc.Timestamp @@ -547,7 +548,7 @@ func geURIFromLoadedJobDetails(details jobspb.Details) string { } func init() { - m := MakeMetrics(base.DefaultHistogramWindowInterval()) + m := metrics.MakeMetrics(base.DefaultHistogramWindowInterval()) jobs.RegisterConstructor( jobspb.TypeLogicalReplication, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { @@ -562,7 +563,7 @@ func init() { } }, jobs.WithJobMetrics(m), - jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds), + jobs.WithResolvedMetric(m.(*metrics.Metrics).ReplicatedTimeSeconds), jobs.UsesTenantCostControl, ) } diff --git a/pkg/crosscluster/logical/logical_replication_job_test.go b/pkg/crosscluster/logical/logical_replication_job_test.go index 8787fe6db8ab..e15fbec8d1be 100644 --- a/pkg/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/crosscluster/logical/logical_replication_job_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrtestutils" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient" @@ -1952,7 +1953,7 @@ func TestFlushErrorHandling(t *testing.T) { dlq := mockDLQ(0) lrw := &logicalReplicationWriterProcessor{ - metrics: MakeMetrics(0).(*Metrics), + metrics: metrics.MakeMetrics(0).(*metrics.Metrics), dlqClient: &dlq, } writerWorkers.Override(ctx, &serverCfg.Settings.SV, 1) diff --git a/pkg/crosscluster/logical/logical_replication_writer_processor.go b/pkg/crosscluster/logical/logical_replication_writer_processor.go index 1c421a26f968..52b512b98465 100644 --- a/pkg/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/crosscluster/logical/logical_replication_writer_processor.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/crosscluster" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -136,7 +137,7 @@ type logicalReplicationWriterProcessor struct { aggTimer timeutil.Timer // metrics are monitoring all running ingestion jobs. - metrics *Metrics + metrics *metrics.Metrics logBufferEvery log.EveryN @@ -230,7 +231,7 @@ func newLogicalReplicationWriterProcessor( ProcessorID: processorID, }, dlqClient: InitDeadLetterQueueClient(dlqDbExec, destTableBySrcID), - metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics), + metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics), seenEvery: log.Every(1 * time.Minute), retryEvery: log.Every(1 * time.Minute), pacer: kvbulk.NewCPUPacer(ctx, flowCtx.Cfg.DB.KV(), useLowPriority), @@ -294,7 +295,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { ctx = lrw.StartInternal(ctx, logicalReplicationWriterProcessorName, listeners...) - lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) db := lrw.FlowCtx.Cfg.DB diff --git a/pkg/crosscluster/logical/metrics/BUILD.bazel b/pkg/crosscluster/logical/metrics/BUILD.bazel new file mode 100644 index 000000000000..0fc94617e65c --- /dev/null +++ b/pkg/crosscluster/logical/metrics/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "metrics", + srcs = ["metrics.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/metric", + "@com_github_cockroachdb_crlib//crstrings", + ], +) diff --git a/pkg/crosscluster/logical/metrics.go b/pkg/crosscluster/logical/metrics/metrics.go similarity index 77% rename from pkg/crosscluster/logical/metrics.go rename to pkg/crosscluster/logical/metrics/metrics.go index 96a8cc6f5fa2..c59739520e13 100644 --- a/pkg/crosscluster/logical/metrics.go +++ b/pkg/crosscluster/logical/metrics/metrics.go @@ -3,7 +3,7 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package logical +package metrics import ( "time" @@ -200,7 +200,7 @@ var ( Measurement: "Seconds", Unit: metric.Unit_SECONDS, } - metaLabeledEventsIngetsted = metric.Metadata{ + metaLabeledEventsIngested = metric.Metadata{ Name: "logical_replication.events_ingested_by_label", Help: "Events ingested by all replication jobs by label", Measurement: "Events", @@ -224,6 +224,69 @@ var ( Measurement: "Ranges", Unit: metric.Unit_COUNT, } + + // Txn-mode applier metrics. + metaTxnApplierTxnWaitTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.txn_wait_txns", + Help: "Number of in-flight transactions blocked on a peer transaction's completion", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + LabeledName: "logical_replication.txn_applier.in_flight_txns", + StaticLabels: metric.MakeLabelPairs( + metric.LabelType, "txn_wait", + ), + } + metaTxnApplierHorizonWaitTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.horizon_wait_txns", + Help: "Number of in-flight transactions waiting for the global frontier to advance past their event horizon", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + LabeledName: "logical_replication.txn_applier.in_flight_txns", + StaticLabels: metric.MakeLabelPairs( + metric.LabelType, "horizon_wait", + ), + } + metaTxnApplierReadyTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.ready_txns", + Help: "Number of in-flight transactions that are ready to be applied or currently being applied", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + LabeledName: "logical_replication.txn_applier.in_flight_txns", + StaticLabels: metric.MakeLabelPairs( + metric.LabelType, "ready", + ), + } + + metaLabeledTxnApplierTxnWaitTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.txn_wait_txns_by_label", + Help: "Number of in-flight transactions blocked on a peer transaction's completion, by job scope", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + LabeledName: "logical_replication.txn_applier.in_flight_txns_by_label", + StaticLabels: metric.MakeLabelPairs( + metric.LabelType, "txn_wait", + ), + } + metaLabeledTxnApplierHorizonWaitTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.horizon_wait_txns_by_label", + Help: "Number of in-flight transactions waiting for the global frontier to advance past their event horizon, by job scope", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + LabeledName: "logical_replication.txn_applier.in_flight_txns_by_label", + StaticLabels: metric.MakeLabelPairs( + metric.LabelType, "horizon_wait", + ), + } + metaLabeledTxnApplierReadyTxns = metric.Metadata{ + Name: "logical_replication.txn_applier.ready_txns_by_label", + Help: "Number of in-flight transactions that are ready to be applied or currently being applied, by job scope", + Measurement: "Transactions", + Unit: metric.Unit_COUNT, + LabeledName: "logical_replication.txn_applier.in_flight_txns_by_label", + StaticLabels: metric.MakeLabelPairs( + metric.LabelType, "ready", + ), + } ) // Metrics are for production monitoring of logical replication jobs. @@ -268,6 +331,15 @@ type Metrics struct { LabeledEventsDLQed *metric.CounterVec LabeledScanningRanges *metric.GaugeVec LabeledCatchupRanges *metric.GaugeVec + + // Txn-mode applier metrics. + TxnApplierTxnWaitTxns *metric.Gauge + TxnApplierHorizonWaitTxns *metric.Gauge + TxnApplierReadyTxns *metric.Gauge + + LabeledTxnApplierTxnWaitTxns *metric.GaugeVec + LabeledTxnApplierHorizonWaitTxns *metric.GaugeVec + LabeledTxnApplierReadyTxns *metric.GaugeVec } // MetricStruct implements the metric.Struct interface. @@ -313,9 +385,20 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { // Labeled export-only metrics. LabeledReplicatedTime: metric.NewExportedGaugeVec(metaLabeledReplicatedTime, []string{"label"}), - LabeledEventsIngested: metric.NewExportedCounterVec(metaLabeledEventsIngetsted, []string{"label"}), + LabeledEventsIngested: metric.NewExportedCounterVec(metaLabeledEventsIngested, []string{"label"}), LabeledEventsDLQed: metric.NewExportedCounterVec(metaLabeledEventsDLQed, []string{"label"}), LabeledScanningRanges: metric.NewExportedGaugeVec(metaLabeledScanningRanges, []string{"label"}), LabeledCatchupRanges: metric.NewExportedGaugeVec(metaLabeledCatchupRanges, []string{"label"}), + + TxnApplierTxnWaitTxns: metric.NewGauge(metaTxnApplierTxnWaitTxns), + TxnApplierHorizonWaitTxns: metric.NewGauge(metaTxnApplierHorizonWaitTxns), + TxnApplierReadyTxns: metric.NewGauge(metaTxnApplierReadyTxns), + + LabeledTxnApplierTxnWaitTxns: metric.NewExportedGaugeVec( + metaLabeledTxnApplierTxnWaitTxns, []string{"label"}), + LabeledTxnApplierHorizonWaitTxns: metric.NewExportedGaugeVec( + metaLabeledTxnApplierHorizonWaitTxns, []string{"label"}), + LabeledTxnApplierReadyTxns: metric.NewExportedGaugeVec( + metaLabeledTxnApplierReadyTxns, []string{"label"}), } } diff --git a/pkg/crosscluster/logical/resume_create_table.go b/pkg/crosscluster/logical/resume_create_table.go index 7bad1ce44002..37fe7c8c44b6 100644 --- a/pkg/crosscluster/logical/resume_create_table.go +++ b/pkg/crosscluster/logical/resume_create_table.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/physical" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -143,7 +144,7 @@ func (r *logicalReplicationResumer) runOfflineInitialScan( } } - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, diff --git a/pkg/crosscluster/logical/resume_row.go b/pkg/crosscluster/logical/resume_row.go index 7b01e9d815d1..a29b030fd62e 100644 --- a/pkg/crosscluster/logical/resume_row.go +++ b/pkg/crosscluster/logical/resume_row.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/physical" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" @@ -120,7 +121,7 @@ func (r *logicalReplicationResumer) ingest( replanOracle, func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(execCfg.SV()) }, ) - metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics) + metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) // Store only the original plan diagram jobsprofiler.StorePlanDiagram(ctx, diff --git a/pkg/crosscluster/logical/table_batch_handler_test.go b/pkg/crosscluster/logical/table_batch_handler_test.go index 1ff4a40055bd..50028a3a0393 100644 --- a/pkg/crosscluster/logical/table_batch_handler_test.go +++ b/pkg/crosscluster/logical/table_batch_handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -274,6 +275,8 @@ func newTxnBatchHandler( s.LeaseManager().(*lease.Manager), s.Codec(), s.ClusterSettings(), + metrics.MakeMetrics(0).(*metrics.Metrics), + "", /* metricsLabel */ ) require.NoError(t, err) diff --git a/pkg/crosscluster/logical/txn_batch_handler.go b/pkg/crosscluster/logical/txn_batch_handler.go index 687f9915a93e..bad346d1511e 100644 --- a/pkg/crosscluster/logical/txn_batch_handler.go +++ b/pkg/crosscluster/logical/txn_batch_handler.go @@ -104,12 +104,16 @@ func newTxnBatchHandlerFromConfig( return nil, err } + // Metrics are recorded by the classic LDR processor that wraps this + // handler; pass nil to avoid double counting. writer, err := txnwriter.NewTransactionWriter( ctx, flowCtx.Cfg.DB.(isql.DB), flowCtx.Cfg.LeaseManager.(*lease.Manager), flowCtx.Codec(), flowCtx.Cfg.Settings, + nil, /* metrics */ + "", /* metricsLabel */ ) if err != nil { return nil, err diff --git a/pkg/crosscluster/logical/txnapply/BUILD.bazel b/pkg/crosscluster/logical/txnapply/BUILD.bazel index c1cc19f3b2bc..2fd54d888347 100644 --- a/pkg/crosscluster/logical/txnapply/BUILD.bazel +++ b/pkg/crosscluster/logical/txnapply/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "txnapply", srcs = [ + "applier_stats_poller.go", "committed_set.go", "dependency_resolver.go", "horizon_heap.go", @@ -14,6 +15,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/txnpb", "//pkg/crosscluster/logical/txnwriter", "//pkg/settings", @@ -40,6 +42,7 @@ go_test( embed = [":txnapply"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/txnwriter", "//pkg/settings/cluster", "//pkg/util/admission", @@ -48,6 +51,7 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/randutil", + "//pkg/util/ring", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/crosscluster/logical/txnapply/applier_stats_poller.go b/pkg/crosscluster/logical/txnapply/applier_stats_poller.go new file mode 100644 index 000000000000..80dd1d2dc1a5 --- /dev/null +++ b/pkg/crosscluster/logical/txnapply/applier_stats_poller.go @@ -0,0 +1,71 @@ +// Copyright 2026 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package txnapply + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings" +) + +var applierStatsPollInterval = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "logical_replication.txn_applier.stats_polling_interval", + "how often the txn-mode applier polls and calculates metrics.", + 30*time.Second, + settings.PositiveDuration, +) + +// startApplierStatsPoller periodically polls to calculate applier metrics. +func (a *Applier) startApplierStatsPoller(ctx context.Context, interval time.Duration) error { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + a.updateInFlightTxnMetrics() + } +} + +// updateInFlightTxnMetrics computes and updates in flight transaction metrics. +func (a *Applier) updateInFlightTxnMetrics() { + txnWait, horizonWait, ready := a.computeInFlightTxnStats() + + a.metrics.TxnApplierTxnWaitTxns.Update(txnWait) + a.metrics.TxnApplierHorizonWaitTxns.Update(horizonWait) + a.metrics.TxnApplierReadyTxns.Update(ready) + + if a.metricsLabel != "" { + labels := map[string]string{"label": a.metricsLabel} + a.metrics.LabeledTxnApplierTxnWaitTxns.Update(labels, txnWait) + a.metrics.LabeledTxnApplierHorizonWaitTxns.Update(labels, horizonWait) + a.metrics.LabeledTxnApplierReadyTxns.Update(labels, ready) + } +} + +// computeInFlightTxnStats returns the count of in-flight transactions in each +// state of the applier. +// N.B. the states are mutually exclusive, e.g. a transaction is only waiting +// on the horizon once all of its dependencies are resolved. +func (a *Applier) computeInFlightTxnStats() (txnWait, horizonWait, ready int64) { + a.mu.Lock() + defer a.mu.Unlock() + globalFrontier := a.getGlobalFrontierLocked() + for _, txn := range a.mu.transactions { + switch { + case txn.remainingDeps > 0: + txnWait++ + case !txn.EventHorizon.LessEq(globalFrontier): + horizonWait++ + } + } + ready = int64(len(a.mu.transactions)) - txnWait - horizonWait + return txnWait, horizonWait, ready +} diff --git a/pkg/crosscluster/logical/txnapply/txn_applier.go b/pkg/crosscluster/logical/txnapply/txn_applier.go index 873b818edb9b..217d7651df67 100644 --- a/pkg/crosscluster/logical/txnapply/txn_applier.go +++ b/pkg/crosscluster/logical/txnapply/txn_applier.go @@ -11,6 +11,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -131,9 +132,11 @@ type Checkpoint struct{ Timestamp hlc.Timestamp } // Also note that the applier assumes it is sent transactions in increasing // timestamp order. type Applier struct { - id ldrdecoder.ApplierID - settings *cluster.Settings - depResolver DependencyResolverClient + id ldrdecoder.ApplierID + settings *cluster.Settings + depResolver DependencyResolverClient + metrics *metrics.Metrics + metricsLabel string mu struct { syncutil.Mutex @@ -190,6 +193,8 @@ func NewApplier( depResolver DependencyResolverClient, allApplierIDs []ldrdecoder.ApplierID, newCPUHandle func() *admission.SQLCPUHandle, + metrics *metrics.Metrics, + metricsLabel string, ) (_ *Applier, retErr error) { defer func() { if retErr != nil { @@ -207,10 +212,15 @@ func NewApplier( if newCPUHandle == nil { return nil, errors.AssertionFailedf("newCPUHandle must not be nil") } + if metrics == nil { + return nil, errors.New("metrics must not be nil") + } a := &Applier{ id: id, settings: settings, depResolver: depResolver, + metrics: metrics, + metricsLabel: metricsLabel, txnWriters: writers, newCPUHandle: newCPUHandle, localResolvedTime: MakeLatest[hlc.Timestamp](), @@ -260,6 +270,11 @@ func (a *Applier) Run(ctx context.Context, input chan ApplierEvent) error { return a.aggregator(ctx, applied, ready) }) + group.GoCtx(func(ctx context.Context) error { + statsInterval := applierStatsPollInterval.Get(&a.settings.SV) + return a.startApplierStatsPoller(ctx, statsInterval) + }) + return group.Wait() } diff --git a/pkg/crosscluster/logical/txnapply/txn_applier_test.go b/pkg/crosscluster/logical/txnapply/txn_applier_test.go index 51c0e2fcaafe..5ec23ecb6d05 100644 --- a/pkg/crosscluster/logical/txnapply/txn_applier_test.go +++ b/pkg/crosscluster/logical/txnapply/txn_applier_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -388,6 +390,126 @@ func TestTxnApplierIndependent(t *testing.T) { } } +// TestInFlightTxnStats verifies that the in-flight-txns gauge breakdown +// computed by the stats poller reflects each transaction's actual state. +func TestInFlightTxnStats(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + const localID ldrdecoder.ApplierID = 1 + allIDs := []ldrdecoder.ApplierID{localID} + + st := cluster.MakeTestingClusterSettings() + newApplier := func(t *testing.T) *Applier { + m := metrics.MakeMetrics(0).(*metrics.Metrics) + writers := []txnwriter.TransactionWriter{&benchWriter{}} + depCtx, cancel := context.WithCancel(ctx) + depTracker, depTrackerCleanup := NewTestDependencyTrackerClient(depCtx, allIDs) + a, err := NewApplier(ctx, localID, st, writers, + depTracker, allIDs, testNewCPUHandle, m, "" /* metricsLabel */) + require.NoError(t, err) + t.Cleanup(func() { + a.Close(ctx) + cancel() + _ = depTrackerCleanup() + }) + return a + } + + newTxn := func(t *testing.T, a *Applier, ts, eventHorizon int64, deps ...ldrdecoder.TxnID) (bool, ldrdecoder.TxnID) { + id := ldrdecoder.TxnID{ApplierID: localID, Timestamp: hlc.Timestamp{WallTime: ts}} + appliable, err := a.recordTransaction(ScheduledTransaction{ + Transaction: ldrdecoder.Transaction{TxnID: id}, + Dependencies: deps, + EventHorizon: hlc.Timestamp{WallTime: eventHorizon}, + }) + require.NoError(t, err) + return appliable, id + } + + resolveDep := func(t *testing.T, a *Applier, completedID ldrdecoder.TxnID) { + a.mu.Lock() + defer a.mu.Unlock() + err := a.resolveDependencyLocked(completedID, a.mu.localWaiting[completedID], + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + } + + requireCounts := func(t *testing.T, a *Applier, txnWait, horizonWait, ready int64) { + t.Helper() + gotTxnWait, gotHorizonWait, gotReady := a.computeInFlightTxnStats() + require.Equal(t, txnWait, gotTxnWait, "txn-wait") + require.Equal(t, horizonWait, gotHorizonWait, "horizon-wait") + require.Equal(t, ready, gotReady, "ready") + } + + t.Run("basic", func(t *testing.T) { + a := newApplier(t) + + // Add a txn with no dependencies, should immediately be placed on the + // ready buffer. + appliable, txn1 := newTxn(t, a, 1, 0) + require.True(t, appliable) + requireCounts(t, a, 0, 0, 1) + + // Add a txn that's blocked on the first txn. + appliable, txn2 := newTxn(t, a, 2, 0, txn1) + require.False(t, appliable) + requireCounts(t, a, 1, 0, 1) + + // Mark the dependency as resolved for the first txn, which should mark the second txn as ready. + resolveDep(t, a, txn1) + requireCounts(t, a, 0, 0, 2) + + // Complete both txns, metrics should drop to 0. + for _, id := range []ldrdecoder.TxnID{txn1, txn2} { + err := a.recordCompletion(ctx, + appliedTransaction{Transaction: ldrdecoder.Transaction{TxnID: id}}, + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + } + requireCounts(t, a, 0, 0, 0) + }) + + // Test that a txn that's blocked on multiple dependencies isn't + // marked as ready until we've resolved all dependencies. + t.Run("multiple dependencies", func(t *testing.T) { + a := newApplier(t) + _, dep1 := newTxn(t, a, 1, 0) + _, dep2 := newTxn(t, a, 2, 0) + _, dep3 := newTxn(t, a, 3, 0) + + _, _ = newTxn(t, a, 4, 0, dep1, dep2, dep3) + requireCounts(t, a, 1, 0, 3) + + // Resolve two of three: blocked stays at 1, ready unchanged. + resolveDep(t, a, dep1) + resolveDep(t, a, dep2) + requireCounts(t, a, 1, 0, 3) + + // Resolve the last dep, everything should be ready. + resolveDep(t, a, dep3) + requireCounts(t, a, 0, 0, 4) + }) + + t.Run("horizon blocked", func(t *testing.T) { + a := newApplier(t) + // Add a txn that's blocked on the Event Horizon to see its + // correctly marked as blocked. + appliable, _ := newTxn(t, a, 0, 10) + require.False(t, appliable, "horizon blocked txn should not be appliable") + requireCounts(t, a, 0, 1, 0) + + // Advance the frontier so we are no longer blocked on the event horizon. + completedTxn, ok := a.processCheckpoint(hlc.Timestamp{WallTime: 11}) + require.True(t, ok) + err := a.recordCompletion(ctx, completedTxn, + new(ring.MakeBuffer[ldrdecoder.Transaction](nil))) + require.NoError(t, err) + requireCounts(t, a, 0, 0, 1) + }) +} + // mockCoordinator simulates the coordinator's checkpoint behavior in tests. // It sends a checkpoint at txn.timestamp-1 to all appliers when a txn's // EventHorizon exceeds the previous checkpoint (required for correctness), @@ -491,7 +613,7 @@ func runDistributedApplier( for i := range writers { writers[i] = sharedWriter } - a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle) + a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle, metrics.MakeMetrics(0).(*metrics.Metrics), "" /* metricsLabel */) require.NoError(t, err) inputs[id] = make(chan ApplierEvent, 2*len(dag)+len(ids)+1) @@ -649,7 +771,7 @@ func runBenchApplier(b *testing.B, dag []txnNode, numWritersPerApplier int, rngS for i := range writers { writers[i] = sharedWriter } - a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle) + a, err := NewApplier(ctx, id, st, writers, depTracker, ids, testNewCPUHandle, metrics.MakeMetrics(0).(*metrics.Metrics), "" /* metricsLabel */) require.NoError(b, err) inputs[id] = make(chan ApplierEvent, 2*len(dag)+len(ids)+1) appliers[id] = a diff --git a/pkg/crosscluster/logical/txnmode/BUILD.bazel b/pkg/crosscluster/logical/txnmode/BUILD.bazel index 9d6b0bdf4ba6..3f35086db5d2 100644 --- a/pkg/crosscluster/logical/txnmode/BUILD.bazel +++ b/pkg/crosscluster/logical/txnmode/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/crosscluster", "//pkg/crosscluster/logical/ldrdecoder", "//pkg/crosscluster/logical/ldrsettings", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/crosscluster/logical/txnapply", "//pkg/crosscluster/logical/txnfeed", diff --git a/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go b/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go index ad2723d71988..0e67f4bbfd62 100644 --- a/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go +++ b/pkg/crosscluster/logical/txnmode/ldr_applier_processor.go @@ -10,9 +10,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnapply" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnpb" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -154,6 +156,7 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { sv := &p.FlowCtx.Cfg.Settings.SV numWriters := int(txnNumWriters.Get(sv)) + m := p.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics) writers := make([]txnwriter.TransactionWriter, 0, numWriters) for range numWriters { writer, err := txnwriter.NewTransactionWriter( @@ -162,6 +165,8 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { p.FlowCtx.Cfg.LeaseManager.(*lease.Manager), p.FlowCtx.Codec(), p.FlowCtx.Cfg.Settings, + m, + p.spec.MetricsLabel, ) if err != nil { for _, w := range writers { @@ -181,6 +186,7 @@ func (p *ldrApplierProcessor) setup(ctx context.Context) error { CreateTime: timeutil.Now().UnixNano(), }, false /* atGateway */) }, + m, p.spec.MetricsLabel, ) if err != nil { return errors.Wrap(err, "creating applier") diff --git a/pkg/crosscluster/logical/txnmode/ldr_coordinator_processor.go b/pkg/crosscluster/logical/txnmode/ldr_coordinator_processor.go index 6662796306aa..6fed291d73c1 100644 --- a/pkg/crosscluster/logical/txnmode/ldr_coordinator_processor.go +++ b/pkg/crosscluster/logical/txnmode/ldr_coordinator_processor.go @@ -13,12 +13,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnapply" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnfeed" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnlock" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnpb" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnscheduler" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -77,6 +79,8 @@ type ldrCoordinatorProcessor struct { applierIDs []ldrdecoder.ApplierID + metrics *metrics.Metrics + // Internal pipeline channels. batches chan decodedBatch outputEvents chan rowenc.EncDatumRow @@ -294,6 +298,7 @@ func (p *ldrCoordinatorProcessor) runDecode(ctx context.Context) error { return ctx.Err() case p.batches <- decodedBatch{checkpoint: checkpoint}: } + p.metrics.CheckpointEvents.Inc(1) default: continue } @@ -596,7 +601,11 @@ func init() { spec execinfrapb.TxnLDRCoordinatorSpec, post *execinfrapb.PostProcessSpec, ) (execinfra.Processor, error) { - proc := &ldrCoordinatorProcessor{spec: spec} + proc := &ldrCoordinatorProcessor{ + spec: spec, + metrics: flowCtx.Cfg.JobRegistry.MetricsStruct(). + JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics), + } err := proc.Init( ctx, proc, post, coordinatorOutputTypes, flowCtx, processorID, nil, execinfra.ProcStateOpts{ diff --git a/pkg/crosscluster/logical/txnmode/txnmode_dist.go b/pkg/crosscluster/logical/txnmode/txnmode_dist.go index 8ed4f27b3012..81f32aafd3a9 100644 --- a/pkg/crosscluster/logical/txnmode/txnmode_dist.go +++ b/pkg/crosscluster/logical/txnmode/txnmode_dist.go @@ -63,10 +63,11 @@ func PlanTxnReplication( } spec := buildCoordinatorSpec(job, sourcePlan, applierIDs, replicatedTime, endTime) + payload := job.Payload().Details.(*jobspb.Payload_LogicalReplicationDetails).LogicalReplicationDetails plan := planCtx.NewPhysicalPlan() if err := buildTxnReplicationPlan( - ctx, plan, applierInstanceIDs, spec, + ctx, plan, applierInstanceIDs, spec, payload.MetricsLabel, ); err != nil { return nil, nil, nil, err } @@ -84,6 +85,7 @@ func buildTxnReplicationPlan( plan *sql.PhysicalPlan, applierInstanceIDs []base.SQLInstanceID, spec execinfrapb.TxnLDRCoordinatorSpec, + metricsLabel string, ) error { applierIDs := make([]int32, len(applierInstanceIDs)) for i, instanceID := range applierInstanceIDs { @@ -138,6 +140,7 @@ func buildTxnReplicationPlan( AllApplierIds: applierIDs, JobID: spec.JobID, Schema: spec.Schema, + MetricsLabel: metricsLabel, } pIdx := plan.AddProcessor(physicalplan.Processor{ diff --git a/pkg/crosscluster/logical/txnmode/txnmode_dist_test.go b/pkg/crosscluster/logical/txnmode/txnmode_dist_test.go index dfd96b5a01ad..692af6451629 100644 --- a/pkg/crosscluster/logical/txnmode/txnmode_dist_test.go +++ b/pkg/crosscluster/logical/txnmode/txnmode_dist_test.go @@ -75,7 +75,7 @@ func TestBuildTxnReplicationPlan(t *testing.T) { JobID: 123, } - err := buildTxnReplicationPlan(ctx, plan, tc.applierInstanceIDs, spec) + err := buildTxnReplicationPlan(ctx, plan, tc.applierInstanceIDs, spec, "" /* metricsLabel */) require.NoError(t, err) // Count processors by type. diff --git a/pkg/crosscluster/logical/txnwriter/BUILD.bazel b/pkg/crosscluster/logical/txnwriter/BUILD.bazel index 3a4d1b0068dd..3f3c253212e8 100644 --- a/pkg/crosscluster/logical/txnwriter/BUILD.bazel +++ b/pkg/crosscluster/logical/txnwriter/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/crosscluster/logical/sqlwriter", "//pkg/keys", "//pkg/kv", @@ -22,6 +23,7 @@ go_library( "//pkg/sql/isql", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], ) @@ -38,6 +40,7 @@ go_test( "//pkg/base", "//pkg/ccl", "//pkg/crosscluster/logical/ldrdecoder", + "//pkg/crosscluster/logical/metrics", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", diff --git a/pkg/crosscluster/logical/txnwriter/apply_batch.go b/pkg/crosscluster/logical/txnwriter/apply_batch.go index c1b25d386fd1..fc0548fd909d 100644 --- a/pkg/crosscluster/logical/txnwriter/apply_batch.go +++ b/pkg/crosscluster/logical/txnwriter/apply_batch.go @@ -7,10 +7,12 @@ package txnwriter import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -36,33 +38,67 @@ func (tw *transactionWriter) ApplyBatch( results := make([]ApplyResult, len(transactions)) + batchStart := timeutil.Now() err := tw.session.Txn(ctx, func(ctx context.Context) error { // We clear the results because the Txn may be retried. clear(results) return tw.tryApply(ctx, transactions, results) }) - if err == nil { - return results, nil - } - if !errors.Is(err, sqlwriter.ErrStalePreviousValue) { - return nil, err - } - - err = tw.session.Txn(ctx, func(ctx context.Context) error { - clear(results) - refreshed, err := tw.refresh(ctx, transactions, results) + if err != nil { + if !errors.Is(err, sqlwriter.ErrStalePreviousValue) { + return nil, err + } + err = tw.session.Txn(ctx, func(ctx context.Context) error { + clear(results) + refreshed, err := tw.refresh(ctx, transactions, results) + if err != nil { + return err + } + return tw.tryApply(ctx, refreshed, results) + }) if err != nil { - return err + return nil, err } - return tw.tryApply(ctx, refreshed, results) - }) - if err != nil { - return nil, err } + tw.recordBatchStats(transactions, results, batchStart, timeutil.Now()) return results, nil } +// recordBatchStats records throughput and latency metrics for a successfully +// applied batch. +func (tw *transactionWriter) recordBatchStats( + transactions []ldrdecoder.Transaction, + results []ApplyResult, + batchStart, batchEnd time.Time, +) { + if tw.metrics == nil { + return + } + totalRows := 0 + for i, transaction := range transactions { + totalRows += len(transaction.WriteSet) + if results[i].DlqReason != nil { + continue + } + tw.metrics.AppliedRowUpdates.Inc(int64(results[i].AppliedRows)) + if tw.metricsLabel != "" { + tw.metrics.LabeledEventsIngested.Inc( + map[string]string{"label": tw.metricsLabel}, + int64(results[i].AppliedRows), + ) + } + tw.metrics.ReceivedLogicalBytes.Inc(transaction.Bytes) + tw.metrics.CommitToCommitLatency.RecordValue( + batchEnd.Sub(transaction.TxnID.Timestamp.GoTime()).Nanoseconds()) + } + nanosPerRow := batchEnd.Sub(batchStart).Nanoseconds() + if totalRows > 0 { + nanosPerRow /= int64(totalRows) + } + tw.metrics.ApplyBatchNanosHist.RecordValue(nanosPerRow) +} + func (tw *transactionWriter) tryApply( ctx context.Context, txn []ldrdecoder.Transaction, results []ApplyResult, ) error { diff --git a/pkg/crosscluster/logical/txnwriter/apply_batch_test.go b/pkg/crosscluster/logical/txnwriter/apply_batch_test.go index 1f1c86b28e90..38bc3894aa4a 100644 --- a/pkg/crosscluster/logical/txnwriter/apply_batch_test.go +++ b/pkg/crosscluster/logical/txnwriter/apply_batch_test.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -31,6 +32,8 @@ func newTxnWriter(t *testing.T, s serverutils.ApplicationLayerInterface) Transac s.LeaseManager().(*lease.Manager), s.Codec(), s.ClusterSettings(), + metrics.MakeMetrics(0).(*metrics.Metrics), + "", /* metricsLabel */ ) require.NoError(t, err) return writer diff --git a/pkg/crosscluster/logical/txnwriter/writer.go b/pkg/crosscluster/logical/txnwriter/writer.go index 53759f544cf9..b47afd5ff631 100644 --- a/pkg/crosscluster/logical/txnwriter/writer.go +++ b/pkg/crosscluster/logical/txnwriter/writer.go @@ -9,6 +9,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder" + "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics" "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -44,12 +45,17 @@ type TransactionWriter interface { Close(ctx context.Context) } +// NewTransactionWriter constructs a writer. A nil metrics is allowed +// to indicate the TransactionWriter should not record metrics; used +// by non transaction mode paths that have their own metrics. func NewTransactionWriter( ctx context.Context, db isql.DB, leaseMgr *lease.Manager, codec keys.SQLCodec, settings *cluster.Settings, + m *metrics.Metrics, + metricsLabel string, ) (TransactionWriter, error) { sd := sql.NewInternalSessionData(ctx, settings, "txn-writer") session, err := sqlwriter.NewInternalSession(ctx, db, sd, settings) @@ -63,6 +69,8 @@ func NewTransactionWriter( codec: codec, sd: sd, settings: settings, + metrics: m, + metricsLabel: metricsLabel, session: session, tableWriters: make(map[descpb.ID]*sqlwriter.RowWriter), tableReaders: make(map[descpb.ID]sqlwriter.RowReader), @@ -71,11 +79,13 @@ func NewTransactionWriter( } type transactionWriter struct { - db isql.DB - leaseMgr *lease.Manager - codec keys.SQLCodec - sd *sessiondata.SessionData - settings *cluster.Settings + db isql.DB + leaseMgr *lease.Manager + codec keys.SQLCodec + sd *sessiondata.SessionData + settings *cluster.Settings + metrics *metrics.Metrics + metricsLabel string session isql.Session tableWriters map[descpb.ID]*sqlwriter.RowWriter diff --git a/pkg/internal/metricscan/metric_owners.yaml b/pkg/internal/metricscan/metric_owners.yaml index 697700e22b42..cf30c4689cee 100644 --- a/pkg/internal/metricscan/metric_owners.yaml +++ b/pkg/internal/metricscan/metric_owners.yaml @@ -474,6 +474,14 @@ owners: logical_replication_retry_queue_events: cockroachdb/cdc logical_replication_scanning_ranges: cockroachdb/cdc logical_replication_scanning_ranges_by_label: cockroachdb/cdc + logical_replication_txn_applier_horizon_wait_txns: cockroachdb/cdc + logical_replication_txn_applier_horizon_wait_txns_by_label: cockroachdb/cdc + logical_replication_txn_applier_in_flight_txns: cockroachdb/cdc + logical_replication_txn_applier_in_flight_txns_by_label: cockroachdb/cdc + logical_replication_txn_applier_ready_txns: cockroachdb/cdc + logical_replication_txn_applier_ready_txns_by_label: cockroachdb/cdc + logical_replication_txn_applier_txn_wait_txns: cockroachdb/cdc + logical_replication_txn_applier_txn_wait_txns_by_label: cockroachdb/cdc mma_change: cockroachdb/kv mma_change_external_lease_failure: cockroachdb/kv mma_change_external_lease_success: cockroachdb/kv diff --git a/pkg/sql/execinfrapb/processors_txn_ldr.proto b/pkg/sql/execinfrapb/processors_txn_ldr.proto index d043fab51fda..364bea046336 100644 --- a/pkg/sql/execinfrapb/processors_txn_ldr.proto +++ b/pkg/sql/execinfrapb/processors_txn_ldr.proto @@ -95,7 +95,11 @@ message TxnLDRApplierSpec { optional LDRSchema schema = 4 [(gogoproto.nullable) = false]; - // NEXT ID: 5. + // MetricsLabel is the label used to group LDR metrics when multiple + // logical replication jobs are running. + optional string metrics_label = 5 [(gogoproto.nullable) = false]; + + // NEXT ID: 6. } // TxnLDRDepResolverSpec is the specification for a dependency resolver processor