Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b50ca84
core,api,xds: Implement load balancing policy delay plumbing
AgraVator May 13, 2026
c38ce1d
fix: tests
AgraVator May 14, 2026
a992bdf
fix: minor changes
AgraVator May 19, 2026
6a55ff2
add missing endDelay()
AgraVator Jun 8, 2026
389b96f
core,api,rls,util,xds: Implement dual Load Balancer delay APIs and ca…
AgraVator Jun 19, 2026
5e56f38
core: Add 100% test coverage for dual LB delay APIs and cadence rules
AgraVator Jun 19, 2026
6a3572b
opentelemetry: Implement dual Load Balancer delay spans and metrics
AgraVator Jun 22, 2026
9a4f21f
Merge remote-tracking branch 'upstream/master' into lb-policy-delay
AgraVator Jun 22, 2026
69b353e
opentelemetry: Implement Attempt-Level RPC delay observability and ac…
AgraVator Jun 23, 2026
af69f94
core, opentelemetry, rls, xds, util: Enrich delay_reason diagnostic m…
AgraVator Jun 23, 2026
3584743
core: Use CONNECTING_RESULT in PickFirstLoadBalancer to fix unused va…
AgraVator Jun 23, 2026
e3727db
util: Use CONNECTING_RESULT in RoundRobinLoadBalancer to fix unused v…
AgraVator Jun 23, 2026
e239b6c
core: Align channel startup queuing delay type to connecting per gRFC…
AgraVator Jun 23, 2026
5fcca43
core: Unify initial channel startup queuing delay reason to 'client c…
AgraVator Jun 23, 2026
c692f73
core, opentelemetry, xds: Replace mock tracers and pickers in delay t…
AgraVator Jun 24, 2026
a24b798
opentelemetry: Remove mock delay tracing test in favor of real SDK sp…
AgraVator Jun 24, 2026
9a0d168
opentelemetry: Align attempt delay duration metric attributes with gR…
AgraVator Jun 24, 2026
10fc3ae
opentelemetry, rls, xds: Refine delay attributes and add comprehensiv…
AgraVator Jun 24, 2026
df7700d
opentelemetry: Fix unused Checkstyle test imports (Proposal A121)
AgraVator Jun 24, 2026
5245f15
opentelemetry: Add real channel E2E delay tracing test via InProcessC…
AgraVator Jun 24, 2026
0ca843d
opentelemetry: Add real channel E2E delay tracing & metrics tests via…
AgraVator Jun 24, 2026
6253e46
opentelemetry: Use non-deprecated setAddressesOrError in E2E delay te…
AgraVator Jun 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat
public void createPendingStream() {
}

/**
* A delay segment started with a specific reason during load balancing.
*
* @param reasonToken the reason for the delay, e.g., "pick_first:connecting"
* @since 1.82.0
*/
public void delayStarted(String reasonToken) {
}

/**
* The current delay segment ended.
*
* @since 1.82.0
*/
public void delayEnded() {
}

/**
* Headers has been sent to the socket.
*/
Expand Down
31 changes: 26 additions & 5 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -549,25 +549,30 @@ public static final class PickResult {
// True if the result is created by withDrop()
private final boolean drop;
@Nullable private final String authorityOverride;
@Nullable private final String delayReasonToken;

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = null;
this(subchannel, streamTracerFactory, status, drop, null, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride) {
this(subchannel, streamTracerFactory, status, drop, authorityOverride, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride,
@Nullable String delayReasonToken) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = authorityOverride;
this.delayReasonToken = delayReasonToken;
}

/**
Expand Down Expand Up @@ -727,6 +732,22 @@ public static PickResult withNoResult() {
return NO_RESULT;
}

/**
* No decision could be made. The RPC will stay buffered with a specific reason.
*
* @since 1.82.0
*/
public static PickResult withNoResult(String delayReasonToken) {
Preconditions.checkNotNull(delayReasonToken, "delayReasonToken");
return new PickResult(null, null, Status.OK, false, null, delayReasonToken);
}

/** Returns the delay reason token if any. */
@Nullable
public String getDelayReasonToken() {
return delayReasonToken;
}

/** Returns the authority override if any. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656")
@Nullable
Expand Down
48 changes: 43 additions & 5 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public final ClientStream newStream(
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers, pickResult);
String token = pickResult != null ? pickResult.getDelayReasonToken() : null;
return createPendingStream(args, tracers, pickResult, token);
}
state = newerState;
}
Expand All @@ -173,8 +174,8 @@ public final ClientStream newStream(
*/
@GuardedBy("lock")
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
PickResult pickResult, @Nullable String delayReasonToken) {
PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus = pickResult.getStatus();
}
Expand Down Expand Up @@ -303,6 +304,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
stream.endDelay();
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
Expand All @@ -315,7 +317,9 @@ final void reprocess(@Nullable SubchannelPicker picker) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
} else { // stay pending
stream.updateDelayReason(pickResult.getDelayReasonToken());
}
}

synchronized (lock) {
Expand Down Expand Up @@ -361,11 +365,44 @@ private class PendingStream extends DelayedStream {
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private volatile Status lastPickStatus;
@Nullable private String delayReasonToken;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
@Nullable String initialToken) {
super("connecting_and_lb");
this.args = args;
this.tracers = tracers;
this.delayReasonToken = initialToken;
if (initialToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayStarted(initialToken);
}
}
}

void updateDelayReason(String newToken) {
if (!java.util.Objects.equals(delayReasonToken, newToken)) {
if (delayReasonToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
}
delayReasonToken = newToken;
if (newToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayStarted(newToken);
}
}
}
}

void endDelay() {
if (delayReasonToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
delayReasonToken = null;
}
}

/** Runnable may be null. */
Expand All @@ -391,6 +428,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve

@Override
public void cancel(Status reason) {
endDelay();
super.cancel(reason);
synchronized (lock) {
if (reportTransportTerminated != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void createPendingStream() {
delegate().createPendingStream();
}

@Override
public void delayStarted(String reasonToken) {
delegate().delayStarted(reasonToken);
}

@Override
public void delayEnded() {
delegate().delayEnded();
}

@Override
public void outboundHeaders() {
delegate().outboundHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
* list and sticking to the first that works.
*/
final class PickFirstLoadBalancer extends LoadBalancer {
private static final PickResult CONNECTING_RESULT =
PickResult.withNoResult("pick_first:connecting");
private final Helper helper;
private Subchannel subchannel;
private ConnectivityState currentState = IDLE;
Expand Down Expand Up @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {

// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
Expand Down Expand Up @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new FixedResultPicker(PickResult.withNoResult());
picker = new FixedResultPicker(CONNECTING_RESULT);
break;
case READY:
picker = new FixedResultPicker(PickResult.withSubchannel(subchannel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,55 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure(
+ " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]");
}

@Test
public void streamDelayMetrics() {
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer };

SubchannelPicker connectingPicker = mock(SubchannelPicker.class);
when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("pick_first:connecting"));

delayedTransport.reprocess(connectingPicker);
delayedTransport.newStream(method, headers, callOptions, customTracers);

InOrder inOrder = inOrder(mockTracer);
inOrder.verify(mockTracer).delayStarted("pick_first:connecting");

SubchannelPicker customDelayPicker = mock(SubchannelPicker.class);
when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("rls:lookup_pending"));

delayedTransport.reprocess(customDelayPicker);

inOrder.verify(mockTracer).delayEnded();
inOrder.verify(mockTracer).delayStarted("rls:lookup_pending");

delayedTransport.reprocess(mockPicker);

inOrder.verify(mockTracer).delayEnded();
}

@Test
public void streamDelayMetrics_cancelled() {
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer };

SubchannelPicker connectingPicker = mock(SubchannelPicker.class);
when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("pick_first:connecting"));

delayedTransport.reprocess(connectingPicker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers);
stream.start(streamListener);

verify(mockTracer).delayStarted("pick_first:connecting");

stream.cancel(Status.CANCELLED);

verify(mockTracer).delayEnded();
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception {
verify(mockSubchannel).requestConnection();

// Calling pickSubchannel() twice gave the same result
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting");
assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs));

verifyNoMoreInteractions(mockHelper);
}
Expand Down
2 changes: 1 addition & 1 deletion rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
return PickResult.withNoResult();
return PickResult.withNoResult("rls:lookup_pending");
}
}

Expand Down
2 changes: 2 additions & 0 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending");
// Cache is warm, but still unconnected
res = picker.pickSubchannel(searchSubchannelArgs);
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
Expand Down Expand Up @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception {
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending");
// Cache is warm, but still unconnected
res = picker.pickSubchannel(searchSubchannelArgs);
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
Expand Down
10 changes: 10 additions & 0 deletions util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public void createPendingStream() {
delegate().createPendingStream();
}

@Override
public void delayStarted(String reasonToken) {
delegate().delayStarted(reasonToken);
}

@Override
public void delayEnded() {
delegate().delayEnded();
}

@Override
public void outboundHeaders() {
delegate().outboundHeaders();
Expand Down
6 changes: 4 additions & 2 deletions util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
* EquivalentAddressGroup}s from the {@link NameResolver}.
*/
final class RoundRobinLoadBalancer extends MultiChildLoadBalancer {
private static final PickResult CONNECTING_RESULT =
PickResult.withNoResult("round_robin:connecting");
private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt());
private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT);

public RoundRobinLoadBalancer(Helper helper) {
super(helper);
Expand All @@ -68,7 +70,7 @@ protected void updateOverallBalancingState() {
}

if (isConnecting) {
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT));
} else {
updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
public class RoundRobinLoadBalancerTest {
private static final Attributes.Key<String> MAJOR_KEY = Attributes.Key.create("major-key");
private static final SubchannelPicker EMPTY_PICKER =
new FixedResultPicker(PickResult.withNoResult());
new FixedResultPicker(PickResult.withNoResult("round_robin:connecting"));

@Rule public final MockitoRule mocks = MockitoJUnit.rule();

Expand Down
3 changes: 3 additions & 0 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
Expand Down Expand Up @@ -119,6 +120,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
errorPrefix() + "Unable to find non-dynamic cluster"));
}
// The dynamic cluster must not have loaded yet
helper.updateBalancingState(
CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending")));
return Status.OK;
}
if (!clusterConfigOr.hasValue()) {
Expand Down
Loading
Loading