Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7754,6 +7754,24 @@ layers:
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.blocked_txns
exported_name: logical_replication_txn_applier_blocked_txns
description: Number of transactions the applier has received but not yet written, blocked on either a txn dependency or the event horizon
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: logical_replication.txn_applier.ready_txns
exported_name: logical_replication_txn_applier_ready_txns
description: Number of transactions that the applier has received and are ready to be committed or are being committed
y_axis_label: Transactions
type: GAUGE
unit: COUNT
aggregation: AVG
derivative: NONE
owner: cockroachdb/cdc
- name: obs.clustermetrics.flush.count
exported_name: obs_clustermetrics_flush_count
description: Number of cluster metrics flush operations
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,7 @@ GO_TARGETS = [
"//pkg/crosscluster/logical/ldrdecoder:ldrdecoder_test",
"//pkg/crosscluster/logical/ldrsettings:ldrsettings",
"//pkg/crosscluster/logical/ldrtestutils:ldrtestutils",
"//pkg/crosscluster/logical/metrics:metrics",
"//pkg/crosscluster/logical/sqlwriter:sqlwriter",
"//pkg/crosscluster/logical/sqlwriter:sqlwriter_test",
"//pkg/crosscluster/logical/txnapply:txnapply",
Expand Down
4 changes: 2 additions & 2 deletions pkg/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
"logical_replication_writer_processor.go",
"lww_kv_processor.go",
"lww_row_processor.go",
"metrics.go",
"offline_initial_scan_processor.go",
"purgatory.go",
"resume_create_table.go",
Expand All @@ -35,6 +34,7 @@ go_library(
"//pkg/crosscluster",
"//pkg/crosscluster/logical/ldrdecoder",
"//pkg/crosscluster/logical/ldrsettings",
"//pkg/crosscluster/logical/metrics",
"//pkg/crosscluster/logical/sqlwriter",
"//pkg/crosscluster/logical/txnapply",
"//pkg/crosscluster/logical/txnmode",
Expand Down Expand Up @@ -115,7 +115,6 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_crlib//crstrings",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -154,6 +153,7 @@ go_test(
"//pkg/crosscluster",
"//pkg/crosscluster/logical/ldrdecoder",
"//pkg/crosscluster/logical/ldrtestutils",
"//pkg/crosscluster/logical/metrics",
"//pkg/crosscluster/logical/sqlwriter",
"//pkg/crosscluster/logical/txnwriter",
"//pkg/crosscluster/replicationtestutils",
Expand Down
7 changes: 4 additions & 3 deletions pkg/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -262,7 +263,7 @@ func loadOnlineReplicatedTime(
type rowHandler struct {
replicatedTimeAtStart hlc.Timestamp
frontier span.Frontier
metrics *Metrics
metrics *metrics.Metrics
settings *settings.Values
job *jobs.Job
frontierUpdates chan hlc.Timestamp
Expand Down Expand Up @@ -547,7 +548,7 @@ func geURIFromLoadedJobDetails(details jobspb.Details) string {
}

func init() {
m := MakeMetrics(base.DefaultHistogramWindowInterval())
m := metrics.MakeMetrics(base.DefaultHistogramWindowInterval())
jobs.RegisterConstructor(
jobspb.TypeLogicalReplication,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
Expand All @@ -562,7 +563,7 @@ func init() {
}
},
jobs.WithJobMetrics(m),
jobs.WithResolvedMetric(m.(*Metrics).ReplicatedTimeSeconds),
jobs.WithResolvedMetric(m.(*metrics.Metrics).ReplicatedTimeSeconds),
jobs.UsesTenantCostControl,
)
}
3 changes: 2 additions & 1 deletion pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrtestutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
Expand Down Expand Up @@ -1952,7 +1953,7 @@ func TestFlushErrorHandling(t *testing.T) {

dlq := mockDLQ(0)
lrw := &logicalReplicationWriterProcessor{
metrics: MakeMetrics(0).(*Metrics),
metrics: metrics.MakeMetrics(0).(*metrics.Metrics),
dlqClient: &dlq,
}
writerWorkers.Override(ctx, &serverCfg.Settings.SV, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand Down Expand Up @@ -136,7 +137,7 @@ type logicalReplicationWriterProcessor struct {
aggTimer timeutil.Timer

// metrics are monitoring all running ingestion jobs.
metrics *Metrics
metrics *metrics.Metrics

logBufferEvery log.EveryN

Expand Down Expand Up @@ -230,7 +231,7 @@ func newLogicalReplicationWriterProcessor(
ProcessorID: processorID,
},
dlqClient: InitDeadLetterQueueClient(dlqDbExec, destTableBySrcID),
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics),
metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics),
seenEvery: log.Every(1 * time.Minute),
retryEvery: log.Every(1 * time.Minute),
pacer: kvbulk.NewCPUPacer(ctx, flowCtx.Cfg.DB.KV(), useLowPriority),
Expand Down Expand Up @@ -294,7 +295,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {

ctx = lrw.StartInternal(ctx, logicalReplicationWriterProcessorName, listeners...)

lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
lrw.metrics = lrw.FlowCtx.Cfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics)

db := lrw.FlowCtx.Cfg.DB

Expand Down
12 changes: 12 additions & 0 deletions pkg/crosscluster/logical/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "metrics",
srcs = ["metrics.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/metric",
"@com_github_cockroachdb_crlib//crstrings",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package logical
package metrics

import (
"time"
Expand Down Expand Up @@ -224,6 +224,22 @@ var (
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}

// Txn-mode applier metrics.
metaTxnApplierBlockedTxns = metric.Metadata{
Name: "logical_replication.txn_applier.blocked_txns",
Help: "Number of transactions the applier has received but not yet " +
"written, blocked on either a txn dependency or the event horizon",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
}
metaTxnApplierReadyTxns = metric.Metadata{
Name: "logical_replication.txn_applier.ready_txns",
Help: "Number of transactions that the applier has received and " +
"are ready to be committed or are being committed",
Measurement: "Transactions",
Unit: metric.Unit_COUNT,
}
)

// Metrics are for production monitoring of logical replication jobs.
Expand Down Expand Up @@ -268,6 +284,10 @@ type Metrics struct {
LabeledEventsDLQed *metric.CounterVec
LabeledScanningRanges *metric.GaugeVec
LabeledCatchupRanges *metric.GaugeVec

// Txn-mode applier metrics. The applier updates these directly.
TxnApplierBlockedTxns *metric.Gauge
TxnApplierReadyTxns *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -317,5 +337,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
LabeledEventsDLQed: metric.NewExportedCounterVec(metaLabeledEventsDLQed, []string{"label"}),
LabeledScanningRanges: metric.NewExportedGaugeVec(metaLabeledScanningRanges, []string{"label"}),
LabeledCatchupRanges: metric.NewExportedGaugeVec(metaLabeledCatchupRanges, []string{"label"}),

TxnApplierBlockedTxns: metric.NewGauge(metaTxnApplierBlockedTxns),
TxnApplierReadyTxns: metric.NewGauge(metaTxnApplierReadyTxns),
}
}
3 changes: 2 additions & 1 deletion pkg/crosscluster/logical/resume_create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand Down Expand Up @@ -143,7 +144,7 @@ func (r *logicalReplicationResumer) runOfflineInitialScan(
}
}

metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics)

jobsprofiler.StorePlanDiagram(ctx,
execCfg.DistSQLSrv.Stopper,
Expand Down
3 changes: 2 additions & 1 deletion pkg/crosscluster/logical/resume_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrsettings"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
Expand Down Expand Up @@ -120,7 +121,7 @@ func (r *logicalReplicationResumer) ingest(
replanOracle,
func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(execCfg.SV()) },
)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*metrics.Metrics)

// Store only the original plan diagram
jobsprofiler.StorePlanDiagram(ctx,
Expand Down
3 changes: 3 additions & 0 deletions pkg/crosscluster/logical/txnapply/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/crosscluster/logical/ldrdecoder",
"//pkg/crosscluster/logical/metrics",
"//pkg/crosscluster/logical/txnpb",
"//pkg/crosscluster/logical/txnwriter",
"//pkg/settings",
Expand All @@ -39,6 +40,7 @@ go_test(
embed = [":txnapply"],
deps = [
"//pkg/crosscluster/logical/ldrdecoder",
"//pkg/crosscluster/logical/metrics",
"//pkg/crosscluster/logical/txnwriter",
"//pkg/settings/cluster",
"//pkg/util/admission",
Expand All @@ -47,6 +49,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/util/ring",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
19 changes: 18 additions & 1 deletion pkg/crosscluster/logical/txnapply/txn_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrdecoder"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/metrics"
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/txnwriter"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -108,6 +109,7 @@ type Applier struct {
id ldrdecoder.ApplierID
settings *cluster.Settings
depResolver DependencyResolverClient
metrics *metrics.Metrics

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -164,6 +166,7 @@ func NewApplier(
depResolver DependencyResolverClient,
allApplierIDs []ldrdecoder.ApplierID,
newCPUHandle func() *admission.SQLCPUHandle,
metrics *metrics.Metrics,
) (_ *Applier, retErr error) {
defer func() {
if retErr != nil {
Expand All @@ -181,10 +184,14 @@ func NewApplier(
if newCPUHandle == nil {
return nil, errors.AssertionFailedf("newCPUHandle must not be nil")
}
if metrics == nil {
return nil, errors.New("metrics must not be nil")
}
a := &Applier{
id: id,
settings: settings,
depResolver: depResolver,
metrics: metrics,
txnWriters: writers,
newCPUHandle: newCPUHandle,
localResolvedTime: MakeLatest[hlc.Timestamp](),
Expand Down Expand Up @@ -341,6 +348,7 @@ func (a *Applier) recordTransaction(transaction ScheduledTransaction) (bool, err

if transaction.remainingDeps == 0 {
if transaction.EventHorizon.LessEq(a.getGlobalFrontierLocked()) {
a.metrics.TxnApplierReadyTxns.Inc(1)
return true, nil
}
heap.Push(&a.mu.horizonWaiting, horizonWaiter{
Expand All @@ -349,6 +357,7 @@ func (a *Applier) recordTransaction(transaction ScheduledTransaction) (bool, err
})
a.registerHorizonWaitLocked(transaction.EventHorizon)
}
a.metrics.TxnApplierBlockedTxns.Inc(1)
return false, nil
}

Expand Down Expand Up @@ -500,7 +509,11 @@ func (a *Applier) recordCompletion(
delete(a.mu.localWaiting, completedID)

a.mu.committed.Resolve(completedID)
delete(a.mu.transactions, completedID)
// Don't count synthetic transactions.
if _, ok := a.mu.transactions[completedID]; ok {
delete(a.mu.transactions, completedID)
a.metrics.TxnApplierReadyTxns.Dec(1)
}

// Advance the resolved time by draining applied txns from the front
// of the ordered txnIDs buffer.
Expand Down Expand Up @@ -554,6 +567,8 @@ func (a *Applier) resolveDependencyLocked(
if waitingTxn.remainingDeps == 0 {
if waitingTxn.EventHorizon.LessEq(a.getGlobalFrontierLocked()) {
readyBuffer.AddLast(waitingTxn.Transaction)
a.metrics.TxnApplierBlockedTxns.Dec(1)
a.metrics.TxnApplierReadyTxns.Inc(1)
} else {
heap.Push(&a.mu.horizonWaiting, horizonWaiter{
txnID: waitingID,
Expand Down Expand Up @@ -610,6 +625,8 @@ func (a *Applier) drainSatisfiedHorizonWaitersLocked(
heap.Pop(&a.mu.horizonWaiting)
txn := a.mu.transactions[top.txnID]
readyBuffer.AddLast(txn.Transaction)
a.metrics.TxnApplierBlockedTxns.Dec(1)
a.metrics.TxnApplierReadyTxns.Inc(1)
}
}

Expand Down
Loading
Loading