Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/internal/metricscan/metric_owners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,12 @@ owners:
security_certificate_ttl_node_client: cockroachdb/security-engineering
security_certificate_ttl_ui: cockroachdb/security-engineering
security_certificate_ttl_ui_ca: cockroachdb/security-engineering
server_controller_mux_virtual_cluster_wait_admitted: cockroachdb/server
server_controller_mux_virtual_cluster_wait_canceled: cockroachdb/server
server_controller_mux_virtual_cluster_wait_rejected: cockroachdb/server
server_controller_mux_virtual_cluster_wait_success: cockroachdb/server
server_controller_mux_virtual_cluster_wait_timeout: cockroachdb/server
server_controller_mux_virtual_cluster_wait_waiters: cockroachdb/server
server_http_request_duration_nanos: cockroachdb/unowned
spanconfig_kvsubscriber_oldest_protected_record_nanos: cockroachdb/kv
spanconfig_kvsubscriber_protected_record_count: cockroachdb/kv
Expand Down
5 changes: 4 additions & 1 deletion pkg/multitenant/mtinfopb/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ const (
// DataStateAdd indicates tenant data is being added. Not available
// for SQL sessions.
DataStateAdd TenantDataState = 0
// DataStateReady indicates data is ready and SQL servers can access it.
// DataStateReady indicates that the tenant keyspace is eligible for
// tenant-server startup/bootstrap. It does not imply that any particular SQL
// server instance has completed runtime initialization or is routable for
// queries.
DataStateReady TenantDataState = 1
// DataStateDrop indicates tenant data is being deleted. Not
// available for SQL sessions.
Expand Down
16 changes: 15 additions & 1 deletion pkg/multitenant/tenant_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,24 @@ var DefaultTenantSelect = settings.RegisterStringSetting(

// WaitForClusterStartTimeout is the amount of time the tenant
// controller will wait for the default virtual cluster to have an
// active SQL server.
// active, routable SQL server. The tenant's durable data_state=ready
// state only makes it eligible for startup; this timeout waits for
// runtime SQL-serving readiness.
var WaitForClusterStartTimeout = settings.RegisterDurationSetting(
settings.SystemOnly,
"server.controller.mux_virtual_cluster_wait.timeout",
"amount of time to wait for a default virtual cluster to become available when serving SQL connections (0 to disable)",
10*time.Second,
)

// WaitForClusterStartMaxConcurrent is the maximum number of SQL connections
// that may wait concurrently for the default virtual cluster to become
// routable. Connections above this limit fail fast to avoid holding unbounded
// TCP connections while a tenant is starting.
var WaitForClusterStartMaxConcurrent = settings.RegisterIntSetting(
settings.SystemOnly,
"server.controller.mux_virtual_cluster_wait.max_concurrent",
"maximum number of SQL connections that may wait concurrently for the default virtual cluster to become available",
10,
settings.IntWithMinimum(0),
)
4 changes: 4 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"server_controller.go",
"server_controller_channel_orchestrator.go",
"server_controller_http.go",
"server_controller_metrics.go",
"server_controller_new_server.go",
"server_controller_sql.go",
"server_http.go",
Expand Down Expand Up @@ -572,6 +573,7 @@ go_test(
"//pkg/server/authserver",
"//pkg/server/privchecker",
"//pkg/server/rangetestutils",
"//pkg/server/serverctl",
"//pkg/server/serverpb",
"//pkg/server/srvtestutils",
"//pkg/server/status",
Expand All @@ -590,6 +592,7 @@ go_test(
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/sql/pgwire",
"//pkg/sql/pgwire/pgwirecancel",
"//pkg/sql/roleoption",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -637,6 +640,7 @@ go_test(
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil/singleflight",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
systemTenantNameContainer,
pgPreServer.SendRoutingError,
tenantCapabilitiesWatcher,
appRegistry,
cfg.DisableSQLServer,
cfg.BaseConfig.DisableTLSForHTTP,
cfg.Insecure,
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
Expand Down Expand Up @@ -83,6 +85,11 @@ type serverController struct {
sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName)

tenantWaiter *singleflight.Group
metrics *serverControllerMetrics

muxWaiters atomic.Int64
muxRejectEvery log.EveryN
muxTimeoutEvery log.EveryN

// draining is set when the surrounding server starts draining, and
// prevents further creation of new tenant servers.
Expand Down Expand Up @@ -135,6 +142,7 @@ func newServerController(
systemTenantNameContainer *roachpb.TenantNameContainer,
sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName),
watcher *tenantcapabilitieswatcher.Watcher,
registry *metric.Registry,
disableSQLServer bool,
disableTLSForHTTP bool,
insecure bool,
Expand All @@ -149,10 +157,13 @@ func newServerController(
sendSQLRoutingError: sendSQLRoutingError,
watcher: watcher,
tenantWaiter: singleflight.NewGroup("tenant server poller", "poll"),
metrics: makeServerControllerMetrics(registry),
drainCh: make(chan struct{}),
disableSQLServer: disableSQLServer,
disableTLSForHTTP: disableTLSForHTTP,
insecure: insecure,
muxRejectEvery: log.Every(10 * time.Second),
muxTimeoutEvery: log.Every(10 * time.Second),
}
c.orchestrator = newChannelOrchestrator(parentStopper, c)
c.mu.servers = map[roachpb.TenantName]*serverState{
Expand Down
10 changes: 9 additions & 1 deletion pkg/server/server_controller_channel_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ type serverState struct {

// server is the server that is being controlled.
// This is only set when the corresponding orchestratedServer
// instance is ready.
// instance is ready to accept SQL traffic. For tenant servers, this is
// the runtime routable boundary: startControlledServer only publishes the
// server here after preStart() and acceptClients() have both succeeded.
// Durable tenant state such as data_state=ready only makes a tenant
// eligible for startup; it does not populate this field.
server orchestratedServer
}

Expand Down Expand Up @@ -390,6 +394,10 @@ func (o *channelOrchestrator) startControlledServer(
return nil, errors.Wrap(err, "while starting graceful drain propagation task")
}

// Publishing into state.startedMu.server below is the routing
// boundary. Keep acceptClients before that publication so SQL
// routing cannot observe a tenant server until it is actually
// ready to serve client connections.
return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients")
}()
if err != nil {
Expand Down
75 changes: 75 additions & 0 deletions pkg/server/server_controller_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2026 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package server

import "github.com/cockroachdb/cockroach/pkg/util/metric"

var (
metaMuxWaiters = metric.Metadata{
Name: "server.controller.mux_virtual_cluster_wait.waiters",
Help: "Current number of SQL connections waiting for the default virtual cluster to become routable",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaMuxWaitAdmitted = metric.Metadata{
Name: "server.controller.mux_virtual_cluster_wait.admitted",
Help: "Number of SQL connections admitted to wait for the default virtual cluster to become routable",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaMuxWaitRejected = metric.Metadata{
Name: "server.controller.mux_virtual_cluster_wait.rejected",
Help: "Number of SQL connections rejected because too many connections were already waiting for the default virtual cluster",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaMuxWaitSuccess = metric.Metadata{
Name: "server.controller.mux_virtual_cluster_wait.success",
Help: "Number of SQL connections that successfully waited for the default virtual cluster to become routable",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaMuxWaitTimeout = metric.Metadata{
Name: "server.controller.mux_virtual_cluster_wait.timeout",
Help: "Number of SQL connections that timed out while waiting for the default virtual cluster to become routable",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
metaMuxWaitCanceled = metric.Metadata{
Name: "server.controller.mux_virtual_cluster_wait.canceled",
Help: "Number of SQL connections whose wait for the default virtual cluster was canceled",
Measurement: "Connections",
Unit: metric.Unit_COUNT,
}
)

type serverControllerMetrics struct {
Waiters *metric.Gauge
Admitted *metric.Counter
Rejected *metric.Counter
Success *metric.Counter
Timeout *metric.Counter
Canceled *metric.Counter
}

var _ metric.Struct = (*serverControllerMetrics)(nil)

func (*serverControllerMetrics) MetricStruct() {}

func makeServerControllerMetrics(registry *metric.Registry) *serverControllerMetrics {
m := &serverControllerMetrics{
Waiters: metric.NewGauge(metaMuxWaiters),
Admitted: metric.NewCounter(metaMuxWaitAdmitted),
Rejected: metric.NewCounter(metaMuxWaitRejected),
Success: metric.NewCounter(metaMuxWaitSuccess),
Timeout: metric.NewCounter(metaMuxWaitTimeout),
Canceled: metric.NewCounter(metaMuxWaitCanceled),
}
if registry != nil {
registry.AddMetricStruct(m)
}
return m
}
54 changes: 52 additions & 2 deletions pkg/server/server_controller_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,24 @@ func (c *serverController) shouldWaitForTenantServer(name roachpb.TenantName) bo
return multitenant.WaitForClusterStartTimeout.Get(&c.st.SV) > 0
}

type tenantServerWaitTimeout struct{}

func (tenantServerWaitTimeout) Error() string { return "tenant server wait timeout" }

var errTenantServerWaitTimeout error = tenantServerWaitTimeout{}

func (c *serverController) waitForTenantServer(
ctx context.Context, name roachpb.TenantName,
) (onDemandServer, error) {
if release, ok := c.tryAdmitTenantServerWaiter(ctx, name); ok {
defer release()
} else {
return nil, errors.Mark(
errors.Newf("server for tenant %q is starting; too many clients are already waiting", name),
errNoTenantServerRunning,
)
}

// Note that requests that come in after the first request may time out
// in less time than the WaitForClusterStartTimeout. This seems fine for
// now since cluster startup should be relatively quick and if it isn't,
Expand All @@ -142,18 +157,53 @@ func (c *serverController) waitForTenantServer(
select {
case <-waitCh:
case <-t.C:
log.Dev.Infof(ctx, "timed out waiting for server for %s to become available", name)
return nil, err
if c.muxTimeoutEvery.ShouldLog() {
log.Dev.Infof(ctx, "timed out waiting for server for %s to become available", name)
}
return nil, errors.Mark(err, errTenantServerWaitTimeout)
}
}
})
res := futureRes.WaitForResult(ctx)
if res.Err != nil {
switch {
case ctx.Err() != nil:
c.metrics.Canceled.Inc(1)
case errors.Is(res.Err, errTenantServerWaitTimeout):
c.metrics.Timeout.Inc(1)
}
return nil, res.Err
}
c.metrics.Success.Inc(1)
return res.Val.(onDemandServer), nil
}

func (c *serverController) tryAdmitTenantServerWaiter(
ctx context.Context, name roachpb.TenantName,
) (release func(), ok bool) {
limit := multitenant.WaitForClusterStartMaxConcurrent.Get(&c.st.SV)
for {
cur := c.muxWaiters.Load()
if cur >= limit {
c.metrics.Rejected.Inc(1)
if c.muxRejectEvery.ShouldLog() {
log.Dev.Infof(ctx,
"rejecting SQL connection for tenant %s; %d clients are already waiting for startup (limit %d)",
name, cur, limit)
}
return nil, false
}
if c.muxWaiters.CompareAndSwap(cur, cur+1) {
c.metrics.Admitted.Inc(1)
c.metrics.Waiters.Inc(1)
return func() {
c.muxWaiters.Add(-1)
c.metrics.Waiters.Dec(1)
}, true
}
}
}

func (t *systemServerWrapper) handleCancel(
ctx context.Context, cancelKey pgwirecancel.BackendKeyData,
) {
Expand Down
Loading
Loading