Skip to content

Commit 95fac1b

Browse files
committed
binder: Add unit tests for ServerInbound message reassembly
1 parent 10fc91c commit 95fac1b

File tree

5 files changed

+172
-3
lines changed

5 files changed

+172
-3
lines changed

binder/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies {
5454
testImplementation project(':grpc-testing')
5555
testImplementation project(':grpc-inprocess')
5656
testImplementation testFixtures(project(':grpc-core'))
57+
testImplementation testFixtures(project(':grpc-api'))
5758

5859
androidTestAnnotationProcessor libraries.auto.value
5960
androidTestImplementation project(':grpc-testing')

binder/src/main/java/io/grpc/binder/internal/BlockPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class BlockPool {
4040
* The size of each standard block. (Currently 16k) The block size must be at least as large as
4141
* the maximum header list size.
4242
*/
43-
private static final int BLOCK_SIZE = Math.max(16 * 1024, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
43+
static final int BLOCK_SIZE = Math.max(16 * 1024, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
4444

4545
/**
4646
* Maximum number of blocks to keep around. (Max 128k). This limit is a judgement call. 128k is

binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
import static android.os.IBinder.FLAG_ONEWAY;
2020
import static android.os.Process.myUid;
21+
import static com.google.common.truth.Truth.assertAbout;
2122
import static com.google.common.truth.Truth.assertThat;
2223
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2324
import static io.grpc.binder.internal.BinderTransport.REMOTE_UID;
2425
import static io.grpc.binder.internal.BinderTransport.SETUP_TRANSPORT;
2526
import static io.grpc.binder.internal.BinderTransport.SHUTDOWN_TRANSPORT;
2627
import static io.grpc.binder.internal.BinderTransport.WIRE_FORMAT_VERSION;
28+
import static io.grpc.testing.StatusSubject.status;
2729
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2830
import static org.junit.Assume.assumeTrue;
2931
import static org.mockito.ArgumentMatchers.any;
@@ -47,15 +49,20 @@
4749
import com.google.common.collect.ImmutableList;
4850
import com.google.common.truth.TruthJUnit;
4951
import io.grpc.Attributes;
52+
import io.grpc.CallOptions;
5053
import io.grpc.InternalChannelz.SocketStats;
54+
import io.grpc.Metadata;
5155
import io.grpc.ServerStreamTracer;
5256
import io.grpc.Status;
5357
import io.grpc.binder.AndroidComponentAddress;
5458
import io.grpc.binder.ApiConstants;
5559
import io.grpc.binder.AsyncSecurityPolicy;
5660
import io.grpc.binder.SecurityPolicies;
61+
import io.grpc.binder.internal.OneWayBinderProxies.*;
5762
import io.grpc.binder.internal.SettableAsyncSecurityPolicy.AuthRequest;
5863
import io.grpc.internal.AbstractTransportTest;
64+
import io.grpc.internal.ClientStream;
65+
import io.grpc.internal.ClientStreamListenerBase;
5966
import io.grpc.internal.ClientTransport;
6067
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
6168
import io.grpc.internal.ConnectionClientTransport;
@@ -66,7 +73,9 @@
6673
import io.grpc.internal.MockServerTransportListener;
6774
import io.grpc.internal.ObjectPool;
6875
import io.grpc.internal.SharedResourcePool;
76+
import java.io.InputStream;
6977
import java.util.List;
78+
import java.util.concurrent.BlockingQueue;
7079
import java.util.concurrent.Executor;
7180
import java.util.concurrent.ScheduledExecutorService;
7281
import org.junit.Before;
@@ -124,6 +133,8 @@ public final class RobolectricBinderTransportTest extends AbstractTransportTest
124133
ServiceInfo serviceInfo;
125134

126135
private int nextServerAddress;
136+
private BlockingBinderDecorator<OneWayBinderProxy> blockingDecorator =
137+
new BlockingBinderDecorator<>();
127138

128139
@Parameter(value = 0)
129140
public boolean preAuthServersParam;
@@ -433,4 +444,148 @@ public void flowControlPushBack() {}
433444
@Ignore("See BinderTransportTest#serverAlreadyListening")
434445
@Override
435446
public void serverAlreadyListening() {}
447+
448+
@Test
449+
public void singleTxnMsgsDeliveredToServerOutOfOrder() throws Exception {
450+
server.start(serverListener);
451+
client =
452+
newClientTransportBuilder()
453+
.setFactory(
454+
newClientTransportFactoryBuilder()
455+
.setBinderDecorator(blockingDecorator)
456+
.buildClientTransportFactory())
457+
.build();
458+
runIfNotNull(client.start(mockClientTransportListener));
459+
blockingDecorator.putNextResult(takeNextBinder(blockingDecorator)); // Endpoint binder.
460+
QueueingOneWayBinderProxy queueingServerProxy =
461+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); // Server binder.
462+
blockingDecorator.putNextResult(queueingServerProxy);
463+
464+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
465+
466+
ClientStream stream =
467+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
468+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
469+
stream.start(clientStreamListener);
470+
stream.writeMessage(methodDescriptor.streamRequest("one"));
471+
stream.writeMessage(methodDescriptor.streamRequest("two"));
472+
stream.halfClose();
473+
474+
// Expect one transaction for headers, one for each message, and one for half-close.
475+
QueueingOneWayBinderProxy.Transaction txHeaders = takeNextTransaction(queueingServerProxy);
476+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingServerProxy);
477+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingServerProxy);
478+
QueueingOneWayBinderProxy.Transaction txHalfClose = takeNextTransaction(queueingServerProxy);
479+
480+
// Deliver messages out of order!
481+
queueingServerProxy.deliver(txHeaders);
482+
queueingServerProxy.deliver(tx2);
483+
queueingServerProxy.deliver(tx1);
484+
queueingServerProxy.deliver(txHalfClose);
485+
486+
MockServerTransportListener serverTransportListener =
487+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
488+
MockServerTransportListener.StreamCreation serverStreamCreation =
489+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
490+
serverStreamCreation.stream.request(2);
491+
492+
// Expect the server to deliver the messages in the order they were originally sent.
493+
InputStream msg1 = takeNextMessage(serverStreamCreation.listener.messageQueue);
494+
assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one");
495+
496+
InputStream msg2 = takeNextMessage(serverStreamCreation.listener.messageQueue);
497+
assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two");
498+
499+
assertThat(serverStreamCreation.listener.awaitHalfClosed(TIMEOUT_MS, MILLISECONDS)).isTrue();
500+
serverStreamCreation.stream.close(Status.OK, new Metadata());
501+
502+
assertAbout(status()).that(clientStreamListener.awaitClose(TIMEOUT_MS, MILLISECONDS)).isOk();
503+
assertAbout(status())
504+
.that(serverStreamCreation.listener.awaitClose(TIMEOUT_MS, MILLISECONDS))
505+
.isOk();
506+
}
507+
508+
@Test
509+
public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception {
510+
server.start(serverListener);
511+
client =
512+
newClientTransportBuilder()
513+
.setFactory(
514+
newClientTransportFactoryBuilder()
515+
.setBinderDecorator(blockingDecorator)
516+
.buildClientTransportFactory())
517+
.build();
518+
runIfNotNull(client.start(mockClientTransportListener));
519+
blockingDecorator.putNextResult(takeNextBinder(blockingDecorator)); // Endpoint binder.
520+
QueueingOneWayBinderProxy queueingServerProxy =
521+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); // Server binder.
522+
blockingDecorator.putNextResult(queueingServerProxy);
523+
524+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
525+
526+
ClientStream stream =
527+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
528+
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
529+
stream.start(clientStreamListener);
530+
531+
String largeMessage = newStringOfLength(BlockPool.BLOCK_SIZE + 1);
532+
stream.writeMessage(methodDescriptor.streamRequest(largeMessage));
533+
stream.halfClose();
534+
535+
// Expect the client to split largeMessage into two transactions, plus headers and half-close.
536+
QueueingOneWayBinderProxy.Transaction txHeaders = takeNextTransaction(queueingServerProxy);
537+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingServerProxy);
538+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingServerProxy);
539+
QueueingOneWayBinderProxy.Transaction txHalfClose = takeNextTransaction(queueingServerProxy);
540+
541+
// Deliver fragments out of order!
542+
queueingServerProxy.deliver(txHeaders);
543+
queueingServerProxy.deliver(tx2);
544+
queueingServerProxy.deliver(tx1);
545+
queueingServerProxy.deliver(txHalfClose);
546+
547+
// Verify that the server reassembles the transactions correctly.
548+
MockServerTransportListener serverTransportListener =
549+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
550+
MockServerTransportListener.StreamCreation serverStreamCreation =
551+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
552+
serverStreamCreation.stream.request(1);
553+
InputStream msg = takeNextMessage(serverStreamCreation.listener.messageQueue);
554+
assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage);
555+
556+
assertThat(serverStreamCreation.listener.awaitHalfClosed(TIMEOUT_MS, MILLISECONDS)).isTrue();
557+
serverStreamCreation.stream.close(Status.OK, new Metadata());
558+
559+
assertAbout(status()).that(clientStreamListener.awaitClose(TIMEOUT_MS, MILLISECONDS)).isOk();
560+
assertAbout(status())
561+
.that(serverStreamCreation.listener.awaitClose(TIMEOUT_MS, MILLISECONDS))
562+
.isOk();
563+
}
564+
565+
private static OneWayBinderProxy takeNextBinder(
566+
BlockingBinderDecorator<OneWayBinderProxy> decorator) throws InterruptedException {
567+
OneWayBinderProxy proxy = decorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS);
568+
assertThat(proxy).isNotNull();
569+
return proxy;
570+
}
571+
572+
private static QueueingOneWayBinderProxy.Transaction takeNextTransaction(
573+
QueueingOneWayBinderProxy proxy) throws InterruptedException {
574+
QueueingOneWayBinderProxy.Transaction tx = proxy.pollNextTransaction(TIMEOUT_MS, MILLISECONDS);
575+
assertThat(tx).isNotNull();
576+
return tx;
577+
}
578+
579+
private static InputStream takeNextMessage(BlockingQueue<InputStream> messageQueue)
580+
throws InterruptedException {
581+
InputStream msg = messageQueue.poll(TIMEOUT_MS, MILLISECONDS);
582+
assertThat(msg).isNotNull();
583+
return msg;
584+
}
585+
586+
private static String newStringOfLength(int numChars) {
587+
char[] chars = new char[numChars];
588+
java.util.Arrays.fill(chars, 'x');
589+
return new String(chars);
590+
}
436591
}

binder/src/testFixtures/java/io/grpc/binder/internal/OneWayBinderProxies.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import android.os.RemoteException;
1919
import java.util.concurrent.BlockingQueue;
2020
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.TimeUnit;
2122
import javax.annotation.Nullable;
2223

2324
/** A collection of {@link OneWayBinderProxy}-related test helpers. */
@@ -42,6 +43,18 @@ public OneWayBinderProxy takeNextRequest() throws InterruptedException {
4243
return requests.take();
4344
}
4445

46+
/**
47+
* Returns the next {@link OneWayBinderProxy} that needs decorating, blocking for up to the
48+
* specified timeout if it hasn't yet been provided to {@link #decorate}.
49+
*
50+
* <p>Follow this with a call to {@link #putNextResult(OneWayBinderProxy)} to provide the result
51+
* of {@link #decorate} and unblock the waiting caller.
52+
*/
53+
public OneWayBinderProxy takeNextRequest(long timeout, TimeUnit unit)
54+
throws InterruptedException {
55+
return requests.poll(timeout, unit);
56+
}
57+
4558
/** Provides the next value to return from {@link #decorate}. */
4659
public void putNextResult(T next) throws InterruptedException {
4760
results.put(next);

core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
185185
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
186186
clientStreamTracer1, clientStreamTracer2
187187
};
188-
private final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
188+
protected final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
189189
new ClientStreamTracer() {}
190190
};
191191

@@ -2195,7 +2195,7 @@ public void streamCreated(Attributes transportAttrs, Metadata metadata) {
21952195
}
21962196
}
21972197

2198-
private static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
2198+
protected static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
21992199
public static final StringMarshaller INSTANCE = new StringMarshaller();
22002200

22012201
@Override

0 commit comments

Comments
 (0)