diff --git a/pkg/sql/mem_limit_test.go b/pkg/sql/mem_limit_test.go index f24f8d722586..29040cad9688 100644 --- a/pkg/sql/mem_limit_test.go +++ b/pkg/sql/mem_limit_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -179,6 +180,38 @@ func TestStreamerTightBudget(t *testing.T) { // to have at most one request to be "in progress". sqlDB.Exec(t, fmt.Sprintf("SET distsql_workmem = '%dB'", blobSize)) + // Give the Streamer an accurate initial estimate of the response size (each + // row's blob is blobSize). Without this, the Streamer starts with its tiny + // default estimate (1KiB) and issues all lookup requests eagerly on the + // first pass. Those eager requests use AllowEmpty=true, so when a response + // exceeds the tiny TargetBytes the KV layer returns it empty with a resume + // span, and the Streamer re-issues it (observable as 9 KV gRPC calls for 5 + // rows: 5 eager + 4 resumes). During this eager/resume churn multiple full + // responses can transiently be held at once under unfavorable goroutine + // scheduling, pushing memory usage well above the expected ~2 MiB and + // flaking this assertion (see #170853). With an accurate initial estimate, + // the first head-of-line request alone exhausts the budget, so the Streamer + // deterministically keeps a single request in progress (5 KV gRPC calls, + // no resumes) - exactly the behavior this test means to verify. In + // production this estimate comes from optimizer stats; here we set it + // explicitly since the test table has no stats. + sqlDB.Exec(t, fmt.Sprintf( + "SET CLUSTER SETTING sql.distsql.streamer.initial_avg_response_size = '%dB'", blobSize, + )) + // Cluster settings propagate asynchronously, so wait until the new value is + // observable before running the measured query. + wantSetting := string(humanizeutil.IBytes(blobSize)) + testutils.SucceedsSoon(t, func() error { + var got string + sqlDB.QueryRow( + t, "SHOW CLUSTER SETTING sql.distsql.streamer.initial_avg_response_size", + ).Scan(&got) + if got != wantSetting { + return errors.Newf("setting not yet propagated: got %q, want %q", got, wantSetting) + } + return nil + }) + // Perform an index join to read the blobs. query := "EXPLAIN ANALYZE (VERBOSE) SELECT sum(length(blob)) FROM t@t_k_idx WHERE k = 1" maximumMemoryUsageRegex := regexp.MustCompile(`maximum memory usage: (\d+\.\d+) MiB`)