Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions pkg/sql/mem_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -179,6 +180,38 @@ func TestStreamerTightBudget(t *testing.T) {
// to have at most one request to be "in progress".
sqlDB.Exec(t, fmt.Sprintf("SET distsql_workmem = '%dB'", blobSize))

// Give the Streamer an accurate initial estimate of the response size (each
// row's blob is blobSize). Without this, the Streamer starts with its tiny
// default estimate (1KiB) and issues all lookup requests eagerly on the
// first pass. Those eager requests use AllowEmpty=true, so when a response
// exceeds the tiny TargetBytes the KV layer returns it empty with a resume
// span, and the Streamer re-issues it (observable as 9 KV gRPC calls for 5
// rows: 5 eager + 4 resumes). During this eager/resume churn multiple full
// responses can transiently be held at once under unfavorable goroutine
// scheduling, pushing memory usage well above the expected ~2 MiB and
// flaking this assertion (see #170853). With an accurate initial estimate,
// the first head-of-line request alone exhausts the budget, so the Streamer
// deterministically keeps a single request in progress (5 KV gRPC calls,
// no resumes) - exactly the behavior this test means to verify. In
// production this estimate comes from optimizer stats; here we set it
// explicitly since the test table has no stats.
sqlDB.Exec(t, fmt.Sprintf(
"SET CLUSTER SETTING sql.distsql.streamer.initial_avg_response_size = '%dB'", blobSize,
))
// Cluster settings propagate asynchronously, so wait until the new value is
// observable before running the measured query.
wantSetting := string(humanizeutil.IBytes(blobSize))
testutils.SucceedsSoon(t, func() error {
var got string
sqlDB.QueryRow(
t, "SHOW CLUSTER SETTING sql.distsql.streamer.initial_avg_response_size",
).Scan(&got)
if got != wantSetting {
return errors.Newf("setting not yet propagated: got %q, want %q", got, wantSetting)
}
return nil
})

// Perform an index join to read the blobs.
query := "EXPLAIN ANALYZE (VERBOSE) SELECT sum(length(blob)) FROM t@t_k_idx WHERE k = 1"
maximumMemoryUsageRegex := regexp.MustCompile(`maximum memory usage: (\d+\.\d+) MiB`)
Expand Down
Loading