Skip to content

Commit 7139baa

Browse files
committed
binder: Add unit tests for ClientInbound's message reassembly
1 parent f77966d commit 7139baa

File tree

2 files changed

+118
-2
lines changed

2 files changed

+118
-2
lines changed

binder/src/main/java/io/grpc/binder/internal/BinderServer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
7070
private final LeakSafeOneWayBinder hostServiceBinder;
7171
private final BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker;
7272
private final InboundParcelablePolicy inboundParcelablePolicy;
73+
private final OneWayBinderProxy.Decorator clientBinderDecorator;
7374

7475
@GuardedBy("this")
7576
private ServerListener listener;
@@ -92,6 +93,7 @@ private BinderServer(Builder builder) {
9293
ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories"));
9394
this.serverPolicyChecker = BinderInternal.createPolicyChecker(builder.serverSecurityPolicy);
9495
this.inboundParcelablePolicy = builder.inboundParcelablePolicy;
96+
this.clientBinderDecorator = builder.clientBinderDecorator;
9597
hostServiceBinder = new LeakSafeOneWayBinder(this);
9698
}
9799

@@ -183,7 +185,7 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) {
183185
executorServicePool,
184186
attrsBuilder.build(),
185187
streamTracerFactories,
186-
OneWayBinderProxy.IDENTITY_DECORATOR,
188+
clientBinderDecorator,
187189
callbackBinder);
188190
transport.start(listener.transportCreated(transport));
189191
return true;
@@ -225,6 +227,7 @@ public static class Builder {
225227
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
226228
ServerSecurityPolicy serverSecurityPolicy = SecurityPolicies.serverInternalOnly();
227229
InboundParcelablePolicy inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
230+
OneWayBinderProxy.Decorator clientBinderDecorator = OneWayBinderProxy.IDENTITY_DECORATOR;
228231

229232
public BinderServer build() {
230233
return new BinderServer(this);
@@ -295,5 +298,19 @@ public Builder setInboundParcelablePolicy(InboundParcelablePolicy inboundParcela
295298
checkNotNull(inboundParcelablePolicy, "inboundParcelablePolicy");
296299
return this;
297300
}
301+
302+
/**
303+
* Sets the {@link OneWayBinderProxy.Decorator} to be applied to this server's "client Binders".
304+
*
305+
* <p>Tests can use this to capture post-setup transactions from server to client. The specified
306+
* decorator will be applied every time a client connects. The decorated result will be used for
307+
* all subsequent transactions to this client from the new ServerTransport.
308+
*
309+
* <p>Optional, {@link OneWayBinderProxy#IDENTITY_DECORATOR} is the default.
310+
*/
311+
public Builder setClientBinderDecorator(OneWayBinderProxy.Decorator clientBinderDecorator) {
312+
this.clientBinderDecorator = checkNotNull(clientBinderDecorator);
313+
return this;
314+
}
298315
}
299316
}

binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static android.os.IBinder.FLAG_ONEWAY;
2020
import static android.os.Process.myUid;
21-
import static com.google.common.truth.Truth.assertAbout;
2221
import static com.google.common.truth.Truth.assertThat;
2322
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2423
import static io.grpc.binder.internal.BinderTransport.REMOTE_UID;
@@ -569,6 +568,106 @@ public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception {
569568
.isOk();
570569
}
571570

571+
@Test
572+
public void singleTxnMsgsDeliveredToClientOutOfOrder() throws Exception {
573+
server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build();
574+
registerServerWithRobolectric((BinderServer) server);
575+
server.start(serverListener);
576+
577+
client = newClientTransport(server);
578+
runIfNotNull(client.start(mockClientTransportListener));
579+
580+
QueueingOneWayBinderProxy queueingClientProxy =
581+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator));
582+
blockingDecorator.putNextResult(queueingClientProxy);
583+
584+
// Deliver the setup transaction without interference.
585+
queueingClientProxy.deliver(takeNextTransaction(queueingClientProxy));
586+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
587+
588+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
589+
ClientStream stream =
590+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
591+
stream.start(clientStreamListener);
592+
stream.halfClose();
593+
stream.request(2);
594+
595+
MockServerTransportListener serverTransportListener =
596+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
597+
MockServerTransportListener.StreamCreation serverStreamCreation =
598+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
599+
600+
serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("one"));
601+
serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("two"));
602+
serverStreamCreation.stream.close(Status.OK, new Metadata());
603+
604+
// Expect one transaction from the server for each message.
605+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingClientProxy);
606+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingClientProxy);
607+
QueueingOneWayBinderProxy.Transaction txClose = takeNextTransaction(queueingClientProxy);
608+
609+
// Deliver messages to the client out of order!
610+
queueingClientProxy.deliver(tx2);
611+
queueingClientProxy.deliver(tx1);
612+
queueingClientProxy.deliver(txClose);
613+
614+
// Client should deliver messages to the application in the order sent.
615+
InputStream msg1 = takeNextMessage(clientStreamListener.messageQueue);
616+
assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one");
617+
InputStream msg2 = takeNextMessage(clientStreamListener.messageQueue);
618+
assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two");
619+
620+
assertAbout(status()).that(clientStreamListener.awaitClose(TIMEOUT_MS, MILLISECONDS)).isOk();
621+
assertAbout(status())
622+
.that(serverStreamCreation.listener.awaitClose(TIMEOUT_MS, MILLISECONDS))
623+
.isOk();
624+
}
625+
626+
@Test
627+
public void msgFragmentsDeliveredToClientOutOfOrder() throws Exception {
628+
server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build();
629+
registerServerWithRobolectric((BinderServer) server);
630+
server.start(serverListener);
631+
632+
client = newClientTransport(server);
633+
runIfNotNull(client.start(mockClientTransportListener));
634+
635+
QueueingOneWayBinderProxy queueingClientProxy =
636+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator));
637+
blockingDecorator.putNextResult(queueingClientProxy);
638+
639+
// Deliver the setup transaction without interference.
640+
queueingClientProxy.deliver(takeNextTransaction(queueingClientProxy));
641+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
642+
643+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
644+
ClientStream stream =
645+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
646+
stream.start(clientStreamListener);
647+
stream.request(1);
648+
649+
MockServerTransportListener serverTransportListener =
650+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
651+
MockServerTransportListener.StreamCreation serverStreamCreation =
652+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
653+
654+
String largeMessage = newStringOfLength(BlockPool.BLOCK_SIZE + 1);
655+
serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse(largeMessage));
656+
serverStreamCreation.stream.flush();
657+
658+
// Expect the client to split largeMessage into two transactions.
659+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingClientProxy);
660+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingClientProxy);
661+
662+
// Deliver them to the client out of order!
663+
queueingClientProxy.deliver(tx2);
664+
queueingClientProxy.deliver(tx1);
665+
666+
// Client should reassemble the message correctly.
667+
InputStream msg = takeNextMessage(clientStreamListener.messageQueue);
668+
assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage);
669+
}
670+
572671
private static OneWayBinderProxy takeNextBinder(
573672
BlockingBinderDecorator<OneWayBinderProxy> decorator) throws InterruptedException {
574673
OneWayBinderProxy proxy = decorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS);

0 commit comments

Comments
 (0)