4848import io .grpc .SecurityLevel ;
4949import io .grpc .ServerCall ;
5050import io .grpc .Status ;
51+ import io .grpc .StatusRuntimeException ;
5152import io .grpc .internal .ServerCallImpl .ServerStreamListenerImpl ;
5253import io .perfmark .PerfMark ;
5354import java .io .ByteArrayInputStream ;
55+ import java .io .IOException ;
5456import java .io .InputStream ;
5557import java .io .InputStreamReader ;
5658import org .junit .Before ;
@@ -69,6 +71,8 @@ public class ServerCallImplTest {
6971
7072 @ Mock private ServerStream stream ;
7173 @ Mock private ServerCall .Listener <Long > callListener ;
74+ @ Mock private StreamListener .MessageProducer messageProducer ;
75+ @ Mock private InputStream message ;
7276
7377 private final CallTracer serverCallTracer = CallTracer .getDefaultFactory ().create ();
7478 private ServerCallImpl <Long , Long > call ;
@@ -493,6 +497,44 @@ public void streamListener_unexpectedRuntimeException() {
493497 assertThat (e ).hasMessageThat ().isEqualTo ("unexpected exception" );
494498 }
495499
500+ @ Test
501+ public void streamListener_statusRuntimeException () throws IOException {
502+ MethodDescriptor <Long , Long > failingParseMethod = MethodDescriptor .<Long , Long >newBuilder ()
503+ .setType (MethodType .UNARY )
504+ .setFullMethodName ("service/method" )
505+ .setRequestMarshaller (new LongMarshaller () {
506+ @ Override
507+ public Long parse (InputStream stream ) {
508+ throw new StatusRuntimeException (Status .RESOURCE_EXHAUSTED
509+ .withDescription ("Decompressed gRPC message exceeds maximum size" ));
510+ }
511+ })
512+ .setResponseMarshaller (new LongMarshaller ())
513+ .build ();
514+
515+ call = new ServerCallImpl <>(stream , failingParseMethod , requestHeaders , context ,
516+ DecompressorRegistry .getDefaultInstance (), CompressorRegistry .getDefaultInstance (),
517+ serverCallTracer , PerfMark .createTag ());
518+
519+ ServerStreamListenerImpl <Long > streamListener =
520+ new ServerCallImpl .ServerStreamListenerImpl <>(call , callListener , context );
521+
522+ when (messageProducer .next ()).thenReturn (message , (InputStream ) null );
523+ streamListener .messagesAvailable (messageProducer );
524+ ArgumentCaptor <Status > statusCaptor = ArgumentCaptor .forClass (Status .class );
525+ verify (stream ).cancel (statusCaptor .capture ());
526+ Status status = statusCaptor .getValue ();
527+ assertEquals (Status .Code .RESOURCE_EXHAUSTED , status .getCode ());
528+ assertEquals ("Decompressed gRPC message exceeds maximum size" , status .getDescription ());
529+
530+ streamListener .halfClosed ();
531+ verify (callListener , never ()).onHalfClose ();
532+
533+ when (messageProducer .next ()).thenReturn (message , (InputStream ) null );
534+ streamListener .messagesAvailable (messageProducer );
535+ verify (callListener , never ()).onMessage (any ());
536+ }
537+
496538 private static class LongMarshaller implements Marshaller <Long > {
497539 @ Override
498540 public InputStream stream (Long value ) {
0 commit comments