Skip to content

Commit 54a292a

Browse files
committed
binder: Add unit tests for ClientInbound's message reassembly
1 parent 63099e7 commit 54a292a

File tree

2 files changed

+111
-1
lines changed

2 files changed

+111
-1
lines changed

binder/src/main/java/io/grpc/binder/internal/BinderServer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
7070
private final LeakSafeOneWayBinder hostServiceBinder;
7171
private final BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker;
7272
private final InboundParcelablePolicy inboundParcelablePolicy;
73+
private final OneWayBinderProxy.Decorator clientBinderDecorator;
7374

7475
@GuardedBy("this")
7576
private ServerListener listener;
@@ -92,6 +93,7 @@ private BinderServer(Builder builder) {
9293
ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories"));
9394
this.serverPolicyChecker = BinderInternal.createPolicyChecker(builder.serverSecurityPolicy);
9495
this.inboundParcelablePolicy = builder.inboundParcelablePolicy;
96+
this.clientBinderDecorator = builder.clientBinderDecorator;
9597
hostServiceBinder = new LeakSafeOneWayBinder(this);
9698
}
9799

@@ -183,7 +185,7 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) {
183185
executorServicePool,
184186
attrsBuilder.build(),
185187
streamTracerFactories,
186-
OneWayBinderProxy.IDENTITY_DECORATOR,
188+
clientBinderDecorator,
187189
callbackBinder);
188190
transport.start(listener.transportCreated(transport));
189191
return true;
@@ -225,6 +227,7 @@ public static class Builder {
225227
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
226228
ServerSecurityPolicy serverSecurityPolicy = SecurityPolicies.serverInternalOnly();
227229
InboundParcelablePolicy inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
230+
OneWayBinderProxy.Decorator clientBinderDecorator = OneWayBinderProxy.IDENTITY_DECORATOR;
228231

229232
public BinderServer build() {
230233
return new BinderServer(this);
@@ -295,5 +298,19 @@ public Builder setInboundParcelablePolicy(InboundParcelablePolicy inboundParcela
295298
checkNotNull(inboundParcelablePolicy, "inboundParcelablePolicy");
296299
return this;
297300
}
301+
302+
/**
303+
* Sets the {@link OneWayBinderProxy.Decorator} to be applied to this server's "client Binders".
304+
*
305+
* <p>Tests can use this to capture post-setup transactions from server to client. The specified
306+
* decorator will be applied every time a client connects. The decorated result will be used for
307+
* all subsequent transactions to this client from the new ServerTransport.
308+
*
309+
* <p>Optional, {@link OneWayBinderProxy#IDENTITY_DECORATOR} is the default.
310+
*/
311+
public Builder setClientBinderDecorator(OneWayBinderProxy.Decorator clientBinderDecorator) {
312+
this.clientBinderDecorator = checkNotNull(clientBinderDecorator);
313+
return this;
314+
}
298315
}
299316
}

binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.grpc.binder.internal.SettableAsyncSecurityPolicy.AuthRequest;
6161
import io.grpc.internal.AbstractTransportTest;
6262
import io.grpc.internal.ClientStream;
63+
import io.grpc.internal.ClientStreamListenerBase;
6364
import io.grpc.internal.ClientTransport;
6465
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
6566
import io.grpc.internal.ConnectionClientTransport;
@@ -538,6 +539,98 @@ public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception {
538539
assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage);
539540
}
540541

542+
@Test
543+
public void singleTxnMsgsDeliveredToClientOutOfOrder() throws Exception {
544+
server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build();
545+
registerServerWithRobolectric((BinderServer) server);
546+
server.start(serverListener);
547+
548+
client = newClientTransport(server);
549+
runIfNotNull(client.start(mockClientTransportListener));
550+
551+
QueueingOneWayBinderProxy queueingClientProxy =
552+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator));
553+
blockingDecorator.putNextResult(queueingClientProxy);
554+
555+
// Deliver the setup transaction without interference.
556+
queueingClientProxy.deliver(takeNextTransaction(queueingClientProxy));
557+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
558+
559+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
560+
ClientStream stream =
561+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
562+
stream.start(clientStreamListener);
563+
stream.request(2);
564+
565+
MockServerTransportListener serverTransportListener =
566+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
567+
MockServerTransportListener.StreamCreation streamCreation =
568+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
569+
570+
streamCreation.stream.writeMessage(methodDescriptor.streamResponse("one"));
571+
streamCreation.stream.writeMessage(methodDescriptor.streamResponse("two"));
572+
streamCreation.stream.flush();
573+
574+
// Expect one transaction from the server for each message.
575+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingClientProxy);
576+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingClientProxy);
577+
578+
// Deliver them to the client out of order!
579+
queueingClientProxy.deliver(tx2);
580+
queueingClientProxy.deliver(tx1);
581+
582+
// Client should deliver messages to the application in the order sent.
583+
InputStream msg1 = takeNextMessage(clientStreamListener.messageQueue);
584+
assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one");
585+
InputStream msg2 = takeNextMessage(clientStreamListener.messageQueue);
586+
assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two");
587+
}
588+
589+
@Test
590+
public void msgFragmentsDeliveredToClientOutOfOrder() throws Exception {
591+
server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build();
592+
registerServerWithRobolectric((BinderServer) server);
593+
server.start(serverListener);
594+
595+
client = newClientTransport(server);
596+
runIfNotNull(client.start(mockClientTransportListener));
597+
598+
QueueingOneWayBinderProxy queueingClientProxy =
599+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator));
600+
blockingDecorator.putNextResult(queueingClientProxy);
601+
602+
// Deliver the setup transaction without interference.
603+
queueingClientProxy.deliver(takeNextTransaction(queueingClientProxy));
604+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
605+
606+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
607+
ClientStream stream =
608+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
609+
stream.start(clientStreamListener);
610+
stream.request(1);
611+
612+
MockServerTransportListener serverTransportListener =
613+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
614+
MockServerTransportListener.StreamCreation streamCreation =
615+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
616+
617+
String largeMessage = newStringOfLength(BlockPool.BLOCK_SIZE + 1);
618+
streamCreation.stream.writeMessage(methodDescriptor.streamResponse(largeMessage));
619+
streamCreation.stream.flush();
620+
621+
// Expect the client to split largeMessage into two transactions.
622+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingClientProxy);
623+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingClientProxy);
624+
625+
// Deliver them to the client out of order!
626+
queueingClientProxy.deliver(tx2);
627+
queueingClientProxy.deliver(tx1);
628+
629+
// Client should reassemble the message correctly.
630+
InputStream msg = takeNextMessage(clientStreamListener.messageQueue);
631+
assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage);
632+
}
633+
541634
private static OneWayBinderProxy takeNextBinder(
542635
BlockingBinderDecorator<OneWayBinderProxy> decorator) throws InterruptedException {
543636
OneWayBinderProxy proxy = decorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS);

0 commit comments

Comments
 (0)