Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7754,6 +7754,66 @@ layers:
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.horizon_wait_txns
exported_name: logical_replication_txn_applier_horizon_wait_txns
labeled_name: 'logical_replication.txn_applier.in_flight_txns{type: horizon_wait}'
description: Number of in-flight transactions waiting for the global frontier to advance past their event horizon
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.horizon_wait_txns_by_label
exported_name: logical_replication_txn_applier_horizon_wait_txns_by_label
labeled_name: 'logical_replication.txn_applier.in_flight_txns_by_label{type: horizon_wait}'
description: Number of in-flight transactions waiting for the global frontier to advance past their event horizon, by job scope
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.ready_txns
exported_name: logical_replication_txn_applier_ready_txns
labeled_name: 'logical_replication.txn_applier.in_flight_txns{type: ready}'
description: Number of in-flight transactions that are ready to be applied or currently being applied
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.ready_txns_by_label
exported_name: logical_replication_txn_applier_ready_txns_by_label
labeled_name: 'logical_replication.txn_applier.in_flight_txns_by_label{type: ready}'
description: Number of in-flight transactions that are ready to be applied or currently being applied, by job scope
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.txn_wait_txns
exported_name: logical_replication_txn_applier_txn_wait_txns
labeled_name: 'logical_replication.txn_applier.in_flight_txns{type: txn_wait}'
description: Number of in-flight transactions blocked on a peer transaction's completion
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.txn_wait_txns_by_label
exported_name: logical_replication_txn_applier_txn_wait_txns_by_label
labeled_name: 'logical_replication.txn_applier.in_flight_txns_by_label{type: txn_wait}'
description: Number of in-flight transactions blocked on a peer transaction's completion, by job scope
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: obs.clustermetrics.flush.count
exported_name: obs_clustermetrics_flush_count
description: Number of cluster metrics flush operations
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,7 @@ GO_TARGETS = [
"//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test",
"//pkg/crosscluster/logical/ldrsettings:ldrsettings",
"//pkg/crosscluster/logical/ldrtestutils:ldrtestutils",
"//pkg/crosscluster/logical/metrics:metrics",
"//pkg/crosscluster/logical/sqlwriter:sqlwriter",
"//pkg/crosscluster/logical/sqlwriter:sqlwriter_test",
"//pkg/crosscluster/logical/txnapply:txnapply",
Expand Down
4 changes: 2 additions & 2 deletions pkg/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
"logical_replication_writer_processor.go",
"lww_kv_processor.go",
"lww_row_processor.go",
"metrics.go",
"offline_initial_scan_processor.go",
"purgatory.go",
"resume_create_table.go",
Expand All @@ -35,6 +34,7 @@ go_library(
"//pkg/crosscluster",
"//pkg/crosscluster/logical/ldrdecoder",
"//pkg/crosscluster/logical/ldrsettings",
"//pkg/crosscluster/logical/metrics",
"//pkg/crosscluster/logical/sqlwriter",
"//pkg/crosscluster/logical/txnapply",
"//pkg/crosscluster/logical/txnmode",
Expand Down Expand Up @@ -115,7 +115,6 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_crlib//crstrings",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -154,6 +153,7 @@ go_test(
"//pkg/crosscluster",
"//pkg/crosscluster/logical/ldrdecoder",
"//pkg/crosscluster/logical/ldrtestutils",
"//pkg/crosscluster/logical/metrics",
"//pkg/crosscluster/logical/sqlwriter",
"//pkg/crosscluster/logical/txnwriter",
"//pkg/crosscluster/replicationtestutils",
Expand Down
2 changes: 2 additions & 0 deletions pkg/crosscluster/logical/ldrdecoder/txn_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TxnID = txnpb.LDRTxnID
type Transaction struct {
TxnID TxnID
WriteSet []DecodedRow
Bytes int64
}

// TxnEnvelope pairs a decoded Transaction with the raw KV events it was
Expand Down Expand Up @@ -73,6 +74,7 @@ func (t *TxnDecoder) DecodeTxn(
decoded.RowTimestamp, result.TxnID.Timestamp)
}
result.WriteSet = append(result.WriteSet, decoded)
result.Bytes += int64(event.Size())
}

return result, nil
Expand Down
7 changes: 4 additions & 3 deletions pkg/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -262,7 +263,7 @@ func loadOnlineReplicatedTime(
type rowHandler struct {
replicatedTimeAtStart hlc.Timestamp
frontier span.Frontier
metrics *Metrics
metrics *metrics.Metrics
settings *settings.Values
job *jobs.Job
frontierUpdates chan hlc.Timestamp
Expand Down Expand Up @@ -547,7 +548,7 @@ func geURIFromLoadedJobDetails(details jobspb.Details) string {
}

func init() {
m := MakeMetrics(base.DefaultHistogramWindowInterval())
m := metrics.MakeMetrics(base.DefaultHistogramWindowInterval())
jobs.RegisterConstructor(
jobspb.TypeLogicalReplication,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
Expand All @@ -562,7 +563,7 @@ func init() {
}
},
jobs.WithJobMetrics(m),
jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds),
jobs.WithResolvedMetric(m.(*metrics.Metrics).ReplicatedTimeSeconds),
jobs.UsesTenantCostControl,
)
}
3 changes: 2 additions & 1 deletion pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrtestutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
Expand Down Expand Up @@ -1952,7 +1953,7 @@ func TestFlushErrorHandling(t *testing.T) {

dlq := mockDLQ(0)
lrw := &logicalReplicationWriterProcessor{
metrics: MakeMetrics(0).(*Metrics),
metrics: metrics.MakeMetrics(0).(*metrics.Metrics),
dlqClient: &dlq,
}
writerWorkers.Override(ctx, &serverCfg.Settings.SV, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand Down Expand Up @@ -136,7 +137,7 @@ type logicalReplicationWriterProcessor struct {
aggTimer timeutil.Timer

// metrics are monitoring all running ingestion jobs.
metrics *Metrics
metrics *metrics.Metrics

logBufferEvery log.EveryN

Expand Down Expand Up @@ -230,7 +231,7 @@ func newLogicalReplicationWriterProcessor(
ProcessorID: processorID,
},
dlqClient: InitDeadLetterQueueClient(dlqDbExec, destTableBySrcID),
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics),
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics),
seenEvery: log.Every(1 * time.Minute),
retryEvery: log.Every(1 * time.Minute),
pacer: kvbulk.NewCPUPacer(ctx, flowCtx.Cfg.DB.KV(), useLowPriority),
Expand Down Expand Up @@ -294,7 +295,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {

ctx = lrw.StartInternal(ctx, logicalReplicationWriterProcessorName, listeners...)

lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics)

db := lrw.FlowCtx.Cfg.DB

Expand Down
12 changes: 12 additions & 0 deletions pkg/crosscluster/logical/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "metrics",
srcs = ["metrics.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/metric",
"@com_github_cockroachdb_crlib//crstrings",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package logical
package metrics

import (
"time"
Expand Down Expand Up @@ -200,7 +200,7 @@ var (
Measurement: "Seconds",
Unit: metric.Unit_SECONDS,
}
metaLabeledEventsIngetsted = metric.Metadata{
metaLabeledEventsIngested = metric.Metadata{
Name: "logical_replication.events_ingested_by_label",
Help: "Events ingested by all replication jobs by label",
Measurement: "Events",
Expand All @@ -224,6 +224,69 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}

// Txn-mode applier metrics.
metaTxnApplierTxnWaitTxns = metric.Metadata{
Name: "logical_replication.txn_applier.txn_wait_txns",
Help: "Number of in-flight transactions blocked on a peer transaction's completion",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
LabeledName: "logical_replication.txn_applier.in_flight_txns",
StaticLabels: metric.MakeLabelPairs(
metric.LabelType, "txn_wait",
),
}
metaTxnApplierHorizonWaitTxns = metric.Metadata{
Name: "logical_replication.txn_applier.horizon_wait_txns",
Help: "Number of in-flight transactions waiting for the global frontier to advance past their event horizon",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
LabeledName: "logical_replication.txn_applier.in_flight_txns",
StaticLabels: metric.MakeLabelPairs(
metric.LabelType, "horizon_wait",
),
}
metaTxnApplierReadyTxns = metric.Metadata{
Name: "logical_replication.txn_applier.ready_txns",
Help: "Number of in-flight transactions that are ready to be applied or currently being applied",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
LabeledName: "logical_replication.txn_applier.in_flight_txns",
StaticLabels: metric.MakeLabelPairs(
metric.LabelType, "ready",
),
}

metaLabeledTxnApplierTxnWaitTxns = metric.Metadata{
Name: "logical_replication.txn_applier.txn_wait_txns_by_label",
Help: "Number of in-flight transactions blocked on a peer transaction's completion, by job scope",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
LabeledName: "logical_replication.txn_applier.in_flight_txns_by_label",
StaticLabels: metric.MakeLabelPairs(
metric.LabelType, "txn_wait",
),
}
metaLabeledTxnApplierHorizonWaitTxns = metric.Metadata{
Name: "logical_replication.txn_applier.horizon_wait_txns_by_label",
Help: "Number of in-flight transactions waiting for the global frontier to advance past their event horizon, by job scope",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
LabeledName: "logical_replication.txn_applier.in_flight_txns_by_label",
StaticLabels: metric.MakeLabelPairs(
metric.LabelType, "horizon_wait",
),
}
metaLabeledTxnApplierReadyTxns = metric.Metadata{
Name: "logical_replication.txn_applier.ready_txns_by_label",
Help: "Number of in-flight transactions that are ready to be applied or currently being applied, by job scope",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
LabeledName: "logical_replication.txn_applier.in_flight_txns_by_label",
StaticLabels: metric.MakeLabelPairs(
metric.LabelType, "ready",
),
}
)

// Metrics are for production monitoring of logical replication jobs.
Expand Down Expand Up @@ -268,6 +331,15 @@ type Metrics struct {
LabeledEventsDLQed *metric.CounterVec
LabeledScanningRanges *metric.GaugeVec
LabeledCatchupRanges *metric.GaugeVec

// Txn-mode applier metrics.
TxnApplierTxnWaitTxns *metric.Gauge
TxnApplierHorizonWaitTxns *metric.Gauge
TxnApplierReadyTxns *metric.Gauge

LabeledTxnApplierTxnWaitTxns *metric.GaugeVec
LabeledTxnApplierHorizonWaitTxns *metric.GaugeVec
LabeledTxnApplierReadyTxns *metric.GaugeVec
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -313,9 +385,20 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {

// Labeled export-only metrics.
LabeledReplicatedTime: metric.NewExportedGaugeVec(metaLabeledReplicatedTime, []string{"label"}),
LabeledEventsIngested: metric.NewExportedCounterVec(metaLabeledEventsIngetsted, []string{"label"}),
LabeledEventsIngested: metric.NewExportedCounterVec(metaLabeledEventsIngested, []string{"label"}),
LabeledEventsDLQed: metric.NewExportedCounterVec(metaLabeledEventsDLQed, []string{"label"}),
LabeledScanningRanges: metric.NewExportedGaugeVec(metaLabeledScanningRanges, []string{"label"}),
LabeledCatchupRanges: metric.NewExportedGaugeVec(metaLabeledCatchupRanges, []string{"label"}),

TxnApplierTxnWaitTxns: metric.NewGauge(metaTxnApplierTxnWaitTxns),
TxnApplierHorizonWaitTxns: metric.NewGauge(metaTxnApplierHorizonWaitTxns),
TxnApplierReadyTxns: metric.NewGauge(metaTxnApplierReadyTxns),

LabeledTxnApplierTxnWaitTxns: metric.NewExportedGaugeVec(
metaLabeledTxnApplierTxnWaitTxns, []string{"label"}),
LabeledTxnApplierHorizonWaitTxns: metric.NewExportedGaugeVec(
metaLabeledTxnApplierHorizonWaitTxns, []string{"label"}),
LabeledTxnApplierReadyTxns: metric.NewExportedGaugeVec(
metaLabeledTxnApplierReadyTxns, []string{"label"}),
}
}
3 changes: 2 additions & 1 deletion pkg/crosscluster/logical/resume_create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand Down Expand Up @@ -143,7 +144,7 @@ func (r *logicalReplicationResumer) runOfflineInitialScan(
}
}

metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics)

jobsprofiler.StorePlanDiagram(ctx,
execCfg.DistSQLSrv.Stopper,
Expand Down
3 changes: 2 additions & 1 deletion pkg/crosscluster/logical/resume_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand Down Expand Up @@ -120,7 +121,7 @@ func (r *logicalReplicationResumer) ingest(
replanOracle,
func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(execCfg.SV()) },
)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics)

// Store only the original plan diagram
jobsprofiler.StorePlanDiagram(ctx,
Expand Down
3 changes: 3 additions & 0 deletions pkg/crosscluster/logical/table_batch_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
Expand Down Expand Up @@ -274,6 +275,8 @@ func newTxnBatchHandler(
s.LeaseManager().(*lease.Manager),
s.Codec(),
s.ClusterSettings(),
metrics.MakeMetrics(0).(*metrics.Metrics),
"", /* metricsLabel */
)
require.NoError(t, err)

Expand Down
Loading
Loading