Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indeterminate cancel or error when streaming bidirectionally #314

Open
stevenll opened this issue May 22, 2023 · 0 comments
Open

Indeterminate cancel or error when streaming bidirectionally #314

stevenll opened this issue May 22, 2023 · 0 comments

Comments

@stevenll
Copy link

stevenll commented May 22, 2023

When integration testing a GRPC service that contains a bidirectional streaming method, we have observed that, in a test where we expect the server to return an error, the server will sporadically emit a server-side CANCEL of the request instead.

We have deduced that this sporadic behavior is related to a thread that AbstractServerStreamObserverAndPublisher starts when it receives a cancel signal from a subscriber, causing it to cancel the client stream after 100ms. Apparently, if the subscriber is running on another thread and raises an error, the error signal is in a race with the cancel thread to be the first to emit a response to the client.

We can recreate the faulty behavior consistently if we deliberately introduce a delay into the subscriber when it emits an error. Let's say we have a simple GRPC service:

message T {
  string data = 1;
}
service TestService {
  rpc bidirectionalMethod(stream T) returns (stream T);
}

We define the TestService implementation to introduce a 500ms delay between emitting a cancel signal and an error signal, as shown below.

@Slf4j
@GrpcService
public class TestService extends ReactorTestServiceGrpc.TestServiceImplBase {

 @Override
  public Flux<T> bidirectionalMethod(Flux<T> request) {
    return request
        .log()
        .subscribeOn(Schedulers.boundedElastic())
        .map(T::getData)
        .map(
            data -> {
              if (data.equals("error")) {
                throwError();
              }
              return data + "!";
            })
        .map(data -> T.newBuilder().setData(data).build())
        .doOnError(throwable -> Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS))
        ;
  }

  private void throwError() {
    throw new RuntimeException("error error error");
  }
}

This consistently causes the server to respond with "io.grpc.StatusRuntimeException: CANCELLED: Server canceled request" rather than the desired error.

Would you please advise us as to how we can change this code to ensure that the service always responds to the client with the intended error in spite of any delay in excess of 100ms that might occur on the subscriber thread? We don't seem to have any direct way to control the cancellation timer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant