From 59c986fa9ef30628164bd4fef77a24a684bdacb8 Mon Sep 17 00:00:00 2001 From: shankeleven Date: Sat, 30 May 2026 03:45:46 +0530 Subject: [PATCH] server: add admission control for tenant startup wait queue When a SQL connection arrives for a default virtual cluster whose tenant server is still initializing, the server controller may wait for the tenant server to become available before accepting the connection. Previously, an unbounded number of connections could wait concurrently, allowing a bootstrapping tenant to consume excessive resources through open TCP connections. Add admission control to limit the number of concurrent waiters and reject excess connections immediately. Introduce metrics for wait queue observability, rate-limit timeout/rejection logs, and improve client errors to encourage retry. Also clarify documentation around DataStateReady and tenant runtime readiness. Relates to: #154857 Release note (ops change): Added cluster setting `server.controller.mux_virtual_cluster_wait.max_concurrent` (default 10) to limit the number of SQL connections that may wait concurrently for a default virtual cluster to become available. Connections above the limit are rejected immediately. Added metrics under `server.controller.mux_virtual_cluster_wait.*` for wait queue observability. --- pkg/internal/metricscan/metric_owners.yaml | 6 + pkg/multitenant/mtinfopb/info.go | 5 +- pkg/multitenant/tenant_config.go | 16 ++- pkg/server/BUILD.bazel | 4 + pkg/server/server.go | 1 + pkg/server/server_controller.go | 10 ++ .../server_controller_channel_orchestrator.go | 10 +- pkg/server/server_controller_metrics.go | 75 ++++++++++ pkg/server/server_controller_sql.go | 54 ++++++- pkg/server/server_controller_test.go | 136 ++++++++++++++++++ pkg/sql/pgwire/pre_serve.go | 4 +- 11 files changed, 314 insertions(+), 7 deletions(-) create mode 100644 pkg/server/server_controller_metrics.go diff --git a/pkg/internal/metricscan/metric_owners.yaml b/pkg/internal/metricscan/metric_owners.yaml index a962031b6b47..3089af0eaa93 100644 --- a/pkg/internal/metricscan/metric_owners.yaml +++ b/pkg/internal/metricscan/metric_owners.yaml @@ -899,6 +899,12 @@ owners: security_certificate_ttl_node_client: cockroachdb/security-engineering security_certificate_ttl_ui: cockroachdb/security-engineering security_certificate_ttl_ui_ca: cockroachdb/security-engineering + server_controller_mux_virtual_cluster_wait_admitted: cockroachdb/server + server_controller_mux_virtual_cluster_wait_canceled: cockroachdb/server + server_controller_mux_virtual_cluster_wait_rejected: cockroachdb/server + server_controller_mux_virtual_cluster_wait_success: cockroachdb/server + server_controller_mux_virtual_cluster_wait_timeout: cockroachdb/server + server_controller_mux_virtual_cluster_wait_waiters: cockroachdb/server server_http_request_duration_nanos: cockroachdb/unowned spanconfig_kvsubscriber_oldest_protected_record_nanos: cockroachdb/kv spanconfig_kvsubscriber_protected_record_count: cockroachdb/kv diff --git a/pkg/multitenant/mtinfopb/info.go b/pkg/multitenant/mtinfopb/info.go index 93de7697aa14..a957fa9f0021 100644 --- a/pkg/multitenant/mtinfopb/info.go +++ b/pkg/multitenant/mtinfopb/info.go @@ -72,7 +72,10 @@ const ( // DataStateAdd indicates tenant data is being added. Not available // for SQL sessions. DataStateAdd TenantDataState = 0 - // DataStateReady indicates data is ready and SQL servers can access it. + // DataStateReady indicates that the tenant keyspace is eligible for + // tenant-server startup/bootstrap. It does not imply that any particular SQL + // server instance has completed runtime initialization or is routable for + // queries. DataStateReady TenantDataState = 1 // DataStateDrop indicates tenant data is being deleted. Not // available for SQL sessions. diff --git a/pkg/multitenant/tenant_config.go b/pkg/multitenant/tenant_config.go index 4f0cba58c529..34a70fa8024e 100644 --- a/pkg/multitenant/tenant_config.go +++ b/pkg/multitenant/tenant_config.go @@ -29,10 +29,24 @@ var DefaultTenantSelect = settings.RegisterStringSetting( // WaitForClusterStartTimeout is the amount of time the tenant // controller will wait for the default virtual cluster to have an -// active SQL server. +// active, routable SQL server. The tenant's durable data_state=ready +// state only makes it eligible for startup; this timeout waits for +// runtime SQL-serving readiness. var WaitForClusterStartTimeout = settings.RegisterDurationSetting( settings.SystemOnly, "server.controller.mux_virtual_cluster_wait.timeout", "amount of time to wait for a default virtual cluster to become available when serving SQL connections (0 to disable)", 10*time.Second, ) + +// WaitForClusterStartMaxConcurrent is the maximum number of SQL connections +// that may wait concurrently for the default virtual cluster to become +// routable. Connections above this limit fail fast to avoid holding unbounded +// TCP connections while a tenant is starting. +var WaitForClusterStartMaxConcurrent = settings.RegisterIntSetting( + settings.SystemOnly, + "server.controller.mux_virtual_cluster_wait.max_concurrent", + "maximum number of SQL connections that may wait concurrently for the default virtual cluster to become available", + 10, + settings.IntWithMinimum(0), +) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 64987c064279..08e6af05eb6b 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -56,6 +56,7 @@ go_library( "server_controller.go", "server_controller_channel_orchestrator.go", "server_controller_http.go", + "server_controller_metrics.go", "server_controller_new_server.go", "server_controller_sql.go", "server_http.go", @@ -569,6 +570,7 @@ go_test( "//pkg/server/authserver", "//pkg/server/privchecker", "//pkg/server/rangetestutils", + "//pkg/server/serverctl", "//pkg/server/serverpb", "//pkg/server/srvtestutils", "//pkg/server/status", @@ -587,6 +589,7 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/isql", "//pkg/sql/pgwire", + "//pkg/sql/pgwire/pgwirecancel", "//pkg/sql/roleoption", "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", @@ -634,6 +637,7 @@ go_test( "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/stop", + "//pkg/util/syncutil/singleflight", "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", diff --git a/pkg/server/server.go b/pkg/server/server.go index 3973311b9278..82bfae7b424b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1381,6 +1381,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf systemTenantNameContainer, pgPreServer.SendRoutingError, tenantCapabilitiesWatcher, + appRegistry, cfg.DisableSQLServer, cfg.BaseConfig.DisableTLSForHTTP, cfg.Insecure, diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index 9304c60079be..ade355d7e8b8 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -84,6 +85,11 @@ type serverController struct { sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName) tenantWaiter *singleflight.Group + metrics *serverControllerMetrics + + muxWaiters atomic.Int64 + muxRejectEvery log.EveryN + muxTimeoutEvery log.EveryN // draining is set when the surrounding server starts draining, and // prevents further creation of new tenant servers. @@ -136,6 +142,7 @@ func newServerController( systemTenantNameContainer *roachpb.TenantNameContainer, sendSQLRoutingError func(ctx context.Context, conn net.Conn, tenantName roachpb.TenantName), watcher *tenantcapabilitieswatcher.Watcher, + registry *metric.Registry, disableSQLServer bool, disableTLSForHTTP bool, insecure bool, @@ -150,10 +157,13 @@ func newServerController( sendSQLRoutingError: sendSQLRoutingError, watcher: watcher, tenantWaiter: singleflight.NewGroup("tenant server poller", "poll"), + metrics: makeServerControllerMetrics(registry), drainCh: make(chan struct{}), disableSQLServer: disableSQLServer, disableTLSForHTTP: disableTLSForHTTP, insecure: insecure, + muxRejectEvery: log.Every(10 * time.Second), + muxTimeoutEvery: log.Every(10 * time.Second), } c.orchestrator = newChannelOrchestrator(parentStopper, c) c.mu.servers = map[roachpb.TenantName]*serverState{ diff --git a/pkg/server/server_controller_channel_orchestrator.go b/pkg/server/server_controller_channel_orchestrator.go index 33165062e929..86b7771a1d40 100644 --- a/pkg/server/server_controller_channel_orchestrator.go +++ b/pkg/server/server_controller_channel_orchestrator.go @@ -80,7 +80,11 @@ type serverState struct { // server is the server that is being controlled. // This is only set when the corresponding orchestratedServer - // instance is ready. + // instance is ready to accept SQL traffic. For tenant servers, this is + // the runtime routable boundary: startControlledServer only publishes the + // server here after preStart() and acceptClients() have both succeeded. + // Durable tenant state such as data_state=ready only makes a tenant + // eligible for startup; it does not populate this field. server orchestratedServer } @@ -390,6 +394,10 @@ func (o *channelOrchestrator) startControlledServer( return nil, errors.Wrap(err, "while starting graceful drain propagation task") } + // Publishing into state.startedMu.server below is the routing + // boundary. Keep acceptClients before that publication so SQL + // routing cannot observe a tenant server until it is actually + // ready to serve client connections. return s, errors.Wrap(s.acceptClients(startCtx), "while accepting clients") }() if err != nil { diff --git a/pkg/server/server_controller_metrics.go b/pkg/server/server_controller_metrics.go new file mode 100644 index 000000000000..29ac4db6bad6 --- /dev/null +++ b/pkg/server/server_controller_metrics.go @@ -0,0 +1,75 @@ +// Copyright 2026 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package server + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +var ( + metaMuxWaiters = metric.Metadata{ + Name: "server.controller.mux_virtual_cluster_wait.waiters", + Help: "Current number of SQL connections waiting for the default virtual cluster to become routable", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaMuxWaitAdmitted = metric.Metadata{ + Name: "server.controller.mux_virtual_cluster_wait.admitted", + Help: "Number of SQL connections admitted to wait for the default virtual cluster to become routable", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaMuxWaitRejected = metric.Metadata{ + Name: "server.controller.mux_virtual_cluster_wait.rejected", + Help: "Number of SQL connections rejected because too many connections were already waiting for the default virtual cluster", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaMuxWaitSuccess = metric.Metadata{ + Name: "server.controller.mux_virtual_cluster_wait.success", + Help: "Number of SQL connections that successfully waited for the default virtual cluster to become routable", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaMuxWaitTimeout = metric.Metadata{ + Name: "server.controller.mux_virtual_cluster_wait.timeout", + Help: "Number of SQL connections that timed out while waiting for the default virtual cluster to become routable", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } + metaMuxWaitCanceled = metric.Metadata{ + Name: "server.controller.mux_virtual_cluster_wait.canceled", + Help: "Number of SQL connections whose wait for the default virtual cluster was canceled", + Measurement: "Connections", + Unit: metric.Unit_COUNT, + } +) + +type serverControllerMetrics struct { + Waiters *metric.Gauge + Admitted *metric.Counter + Rejected *metric.Counter + Success *metric.Counter + Timeout *metric.Counter + Canceled *metric.Counter +} + +var _ metric.Struct = (*serverControllerMetrics)(nil) + +func (*serverControllerMetrics) MetricStruct() {} + +func makeServerControllerMetrics(registry *metric.Registry) *serverControllerMetrics { + m := &serverControllerMetrics{ + Waiters: metric.NewGauge(metaMuxWaiters), + Admitted: metric.NewCounter(metaMuxWaitAdmitted), + Rejected: metric.NewCounter(metaMuxWaitRejected), + Success: metric.NewCounter(metaMuxWaitSuccess), + Timeout: metric.NewCounter(metaMuxWaitTimeout), + Canceled: metric.NewCounter(metaMuxWaitCanceled), + } + if registry != nil { + registry.AddMetricStruct(m) + } + return m +} diff --git a/pkg/server/server_controller_sql.go b/pkg/server/server_controller_sql.go index cf0b5ddea1ae..97f86fa0c2b5 100644 --- a/pkg/server/server_controller_sql.go +++ b/pkg/server/server_controller_sql.go @@ -120,9 +120,24 @@ func (c *serverController) shouldWaitForTenantServer(name roachpb.TenantName) bo return multitenant.WaitForClusterStartTimeout.Get(&c.st.SV) > 0 } +type tenantServerWaitTimeout struct{} + +func (tenantServerWaitTimeout) Error() string { return "tenant server wait timeout" } + +var errTenantServerWaitTimeout error = tenantServerWaitTimeout{} + func (c *serverController) waitForTenantServer( ctx context.Context, name roachpb.TenantName, ) (onDemandServer, error) { + if release, ok := c.tryAdmitTenantServerWaiter(ctx, name); ok { + defer release() + } else { + return nil, errors.Mark( + errors.Newf("server for tenant %q is starting; too many clients are already waiting", name), + errNoTenantServerRunning, + ) + } + // Note that requests that come in after the first request may time out // in less time than the WaitForClusterStartTimeout. This seems fine for // now since cluster startup should be relatively quick and if it isn't, @@ -141,18 +156,53 @@ func (c *serverController) waitForTenantServer( select { case <-waitCh: case <-t.C: - log.Dev.Infof(ctx, "timed out waiting for server for %s to become available", name) - return nil, err + if c.muxTimeoutEvery.ShouldLog() { + log.Dev.Infof(ctx, "timed out waiting for server for %s to become available", name) + } + return nil, errors.Mark(err, errTenantServerWaitTimeout) } } }) res := futureRes.WaitForResult(ctx) if res.Err != nil { + switch { + case ctx.Err() != nil: + c.metrics.Canceled.Inc(1) + case errors.Is(res.Err, errTenantServerWaitTimeout): + c.metrics.Timeout.Inc(1) + } return nil, res.Err } + c.metrics.Success.Inc(1) return res.Val.(onDemandServer), nil } +func (c *serverController) tryAdmitTenantServerWaiter( + ctx context.Context, name roachpb.TenantName, +) (release func(), ok bool) { + limit := multitenant.WaitForClusterStartMaxConcurrent.Get(&c.st.SV) + for { + cur := c.muxWaiters.Load() + if cur >= limit { + c.metrics.Rejected.Inc(1) + if c.muxRejectEvery.ShouldLog() { + log.Dev.Infof(ctx, + "rejecting SQL connection for tenant %s; %d clients are already waiting for startup (limit %d)", + name, cur, limit) + } + return nil, false + } + if c.muxWaiters.CompareAndSwap(cur, cur+1) { + c.metrics.Admitted.Inc(1) + c.metrics.Waiters.Inc(1) + return func() { + c.muxWaiters.Add(-1) + c.metrics.Waiters.Dec(1) + }, true + } + } +} + func (t *systemServerWrapper) handleCancel( ctx context.Context, cancelKey pgwirecancel.BackendKeyData, ) { diff --git a/pkg/server/server_controller_test.go b/pkg/server/server_controller_test.go index 99d73ca4f075..85c54e8daa40 100644 --- a/pkg/server/server_controller_test.go +++ b/pkg/server/server_controller_test.go @@ -7,24 +7,160 @@ package server import ( "context" + "net" + "net/http" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverctl" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type fakeOnDemandServer struct{} + +var _ onDemandServer = fakeOnDemandServer{} + +func (fakeOnDemandServer) annotateCtx(ctx context.Context) context.Context { return ctx } +func (fakeOnDemandServer) preStart(context.Context) error { return nil } +func (fakeOnDemandServer) acceptClients(context.Context) error { return nil } +func (fakeOnDemandServer) shutdownRequested() <-chan serverctl.ShutdownRequest { + return nil +} +func (fakeOnDemandServer) gracefulDrain( + context.Context, bool, +) (uint64, redact.RedactableString, error) { + return 0, "", nil +} +func (fakeOnDemandServer) getTenantID() roachpb.TenantID { return roachpb.SystemTenantID } +func (fakeOnDemandServer) getInstanceID() base.SQLInstanceID { return 1 } +func (fakeOnDemandServer) getHTTPHandlerFn() http.HandlerFunc { return nil } +func (fakeOnDemandServer) handleCancel(context.Context, pgwirecancel.BackendKeyData) {} +func (fakeOnDemandServer) serveConn(context.Context, net.Conn, pgwire.PreServeStatus) error { + return nil +} +func (fakeOnDemandServer) getSQLAddr() string { return "" } +func (fakeOnDemandServer) getRPCAddr() string { return "" } + +func TestServerControllerTenantWaiterAdmission(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + reg := metric.NewRegistry() + sc := &serverController{ + st: st, + metrics: makeServerControllerMetrics(reg), + muxRejectEvery: log.Every(time.Second), + } + + multitenant.WaitForClusterStartMaxConcurrent.Override(ctx, &st.SV, 1) + + release, ok := sc.tryAdmitTenantServerWaiter(ctx, "hello") + require.True(t, ok) + require.Equal(t, int64(1), sc.muxWaiters.Load()) + require.Equal(t, int64(1), sc.metrics.Waiters.Value()) + require.Equal(t, int64(1), sc.metrics.Admitted.Count()) + + rejectedRelease, ok := sc.tryAdmitTenantServerWaiter(ctx, "hello") + require.False(t, ok) + require.Nil(t, rejectedRelease) + require.Equal(t, int64(1), sc.muxWaiters.Load()) + require.Equal(t, int64(1), sc.metrics.Rejected.Count()) + + release() + require.Equal(t, int64(0), sc.muxWaiters.Load()) + require.Equal(t, int64(0), sc.metrics.Waiters.Value()) + + release, ok = sc.tryAdmitTenantServerWaiter(ctx, "hello") + require.True(t, ok) + release() +} + +func TestServerControllerGetServerRequiresRuntimeReadyServer(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + startedOrStoppedCh := make(chan struct{}) + sc := &serverController{} + sc.mu.servers = map[roachpb.TenantName]*serverState{ + "hello": { + nc: roachpb.NewTenantNameContainer("hello"), + startedOrStoppedCh: startedOrStoppedCh, + }, + } + sc.mu.newServerCh = make(chan struct{}) + + srv, waitCh, err := sc.getServer(ctx, "hello") + require.Nil(t, srv) + require.Error(t, err) + require.True(t, errors.Is(err, errNoTenantServerRunning)) + require.Equal(t, (<-chan struct{})(startedOrStoppedCh), waitCh) + + entry := sc.mu.servers["hello"] + entry.startedMu.Lock() + entry.startedMu.server = fakeOnDemandServer{} + entry.startedMu.Unlock() + + srv, _, err = sc.getServer(ctx, "hello") + require.NoError(t, err) + require.NotNil(t, srv) +} + +func TestServerControllerTenantWaiterTimeoutMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + multitenant.WaitForClusterStartTimeout.Override(ctx, &st.SV, time.Nanosecond) + multitenant.WaitForClusterStartMaxConcurrent.Override(ctx, &st.SV, 1) + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + sc := &serverController{ + st: st, + stopper: stopper, + tenantWaiter: singleflight.NewGroup("tenant server poller", "poll"), + metrics: makeServerControllerMetrics(metric.NewRegistry()), + muxRejectEvery: log.Every(time.Second), + muxTimeoutEvery: log.Every(time.Second), + } + sc.mu.servers = map[roachpb.TenantName]*serverState{ + "hello": { + nc: roachpb.NewTenantNameContainer("hello"), + startedOrStoppedCh: make(chan struct{}), + }, + } + sc.mu.newServerCh = make(chan struct{}) + + srv, err := sc.waitForTenantServer(ctx, "hello") + require.Nil(t, srv) + require.Error(t, err) + require.True(t, errors.Is(err, errTenantServerWaitTimeout)) + require.Equal(t, int64(1), sc.metrics.Admitted.Count()) + require.Equal(t, int64(1), sc.metrics.Timeout.Count()) + require.Equal(t, int64(0), sc.metrics.Waiters.Value()) + require.Equal(t, int64(0), sc.muxWaiters.Load()) +} + func TestServerController(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/pgwire/pre_serve.go b/pkg/sql/pgwire/pre_serve.go index 78117ea4ecc8..cf12684f13e5 100644 --- a/pkg/sql/pgwire/pre_serve.go +++ b/pkg/sql/pgwire/pre_serve.go @@ -217,9 +217,9 @@ func (s *PreServeConnHandler) SendRoutingError( ctx context.Context, conn net.Conn, tenantName roachpb.TenantName, ) { err := errors.WithHint( - pgerror.Newf(pgcode.ConnectionException, + pgerror.Newf(pgcode.CannotConnectNow, "service unavailable for target tenant (%v)", tenantName), - `Double check your "-ccluster=" connection option or your "cluster:" database name prefix.`) + `Double check your "-ccluster=" connection option or your "cluster:" database name prefix. If the target tenant is starting, retry later.`) _ = s.sendErr(ctx, s.st, conn, err) }