Skip to content

Commit bb60ae8

Browse files
committed
binder: Add unit tests for ServerInbound message reassembly
1 parent 24d9db1 commit bb60ae8

File tree

4 files changed

+140
-3
lines changed

4 files changed

+140
-3
lines changed

binder/src/main/java/io/grpc/binder/internal/BlockPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class BlockPool {
4040
* The size of each standard block. (Currently 16k) The block size must be at least as large as
4141
* the maximum header list size.
4242
*/
43-
private static final int BLOCK_SIZE = Math.max(16 * 1024, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
43+
static final int BLOCK_SIZE = Math.max(16 * 1024, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
4444

4545
/**
4646
* Maximum number of blocks to keep around. (Max 128k). This limit is a judgement call. 128k is

binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,19 @@
4747
import com.google.common.collect.ImmutableList;
4848
import com.google.common.truth.TruthJUnit;
4949
import io.grpc.Attributes;
50+
import io.grpc.CallOptions;
5051
import io.grpc.InternalChannelz.SocketStats;
52+
import io.grpc.Metadata;
5153
import io.grpc.ServerStreamTracer;
5254
import io.grpc.Status;
5355
import io.grpc.binder.AndroidComponentAddress;
5456
import io.grpc.binder.ApiConstants;
5557
import io.grpc.binder.AsyncSecurityPolicy;
5658
import io.grpc.binder.SecurityPolicies;
59+
import io.grpc.binder.internal.OneWayBinderProxies.*;
5760
import io.grpc.binder.internal.SettableAsyncSecurityPolicy.AuthRequest;
5861
import io.grpc.internal.AbstractTransportTest;
62+
import io.grpc.internal.ClientStream;
5963
import io.grpc.internal.ClientTransport;
6064
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
6165
import io.grpc.internal.ConnectionClientTransport;
@@ -66,7 +70,9 @@
6670
import io.grpc.internal.MockServerTransportListener;
6771
import io.grpc.internal.ObjectPool;
6872
import io.grpc.internal.SharedResourcePool;
73+
import java.io.InputStream;
6974
import java.util.List;
75+
import java.util.concurrent.BlockingQueue;
7076
import java.util.concurrent.Executor;
7177
import java.util.concurrent.ScheduledExecutorService;
7278
import org.junit.Before;
@@ -124,6 +130,8 @@ public final class RobolectricBinderTransportTest extends AbstractTransportTest
124130
ServiceInfo serviceInfo;
125131

126132
private int nextServerAddress;
133+
private BlockingBinderDecorator<OneWayBinderProxy> blockingDecorator =
134+
new BlockingBinderDecorator<>();
127135

128136
@Parameter(value = 0)
129137
public boolean preAuthServersParam;
@@ -433,4 +441,120 @@ public void flowControlPushBack() {}
433441
@Ignore("See BinderTransportTest#serverAlreadyListening")
434442
@Override
435443
public void serverAlreadyListening() {}
444+
445+
@Test
446+
public void singleTxnMsgsDeliveredToServerOutOfOrder() throws Exception {
447+
server.start(serverListener);
448+
client =
449+
newClientTransportBuilder()
450+
.setFactory(
451+
newClientTransportFactoryBuilder()
452+
.setBinderDecorator(blockingDecorator)
453+
.buildClientTransportFactory())
454+
.build();
455+
runIfNotNull(client.start(mockClientTransportListener));
456+
blockingDecorator.putNextResult(takeNextBinder(blockingDecorator)); // Endpoint binder.
457+
QueueingOneWayBinderProxy queueingServerProxy =
458+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); // Server binder.
459+
blockingDecorator.putNextResult(queueingServerProxy);
460+
461+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
462+
463+
ClientStream stream =
464+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
465+
stream.writeMessage(methodDescriptor.streamRequest("one"));
466+
stream.writeMessage(methodDescriptor.streamRequest("two"));
467+
stream.flush();
468+
469+
// Expect one transaction from the client for each message.
470+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingServerProxy);
471+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingServerProxy);
472+
473+
// Deliver them to the server out of order!
474+
queueingServerProxy.deliver(tx2);
475+
queueingServerProxy.deliver(tx1);
476+
477+
MockServerTransportListener serverTransportListener =
478+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
479+
MockServerTransportListener.StreamCreation streamCreation =
480+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
481+
streamCreation.stream.request(2);
482+
483+
// Expect the server to deliver the messages in the order they were originally sent.
484+
InputStream msg1 = takeNextMessage(streamCreation.listener.messageQueue);
485+
assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one");
486+
487+
InputStream msg2 = takeNextMessage(streamCreation.listener.messageQueue);
488+
assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two");
489+
}
490+
491+
@Test
492+
public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception {
493+
server.start(serverListener);
494+
client =
495+
newClientTransportBuilder()
496+
.setFactory(
497+
newClientTransportFactoryBuilder()
498+
.setBinderDecorator(blockingDecorator)
499+
.buildClientTransportFactory())
500+
.build();
501+
runIfNotNull(client.start(mockClientTransportListener));
502+
blockingDecorator.putNextResult(takeNextBinder(blockingDecorator)); // Endpoint binder.
503+
QueueingOneWayBinderProxy queueingServerProxy =
504+
new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); // Server binder.
505+
blockingDecorator.putNextResult(queueingServerProxy);
506+
507+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
508+
509+
ClientStream stream =
510+
client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers);
511+
512+
String largeMessage = newStringOfLength(BlockPool.BLOCK_SIZE + 1);
513+
stream.writeMessage(methodDescriptor.streamRequest(largeMessage));
514+
stream.flush();
515+
516+
// Expect the client to split largeMessage into two transactions.
517+
QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingServerProxy);
518+
QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingServerProxy);
519+
520+
// Deliver them out of order!
521+
queueingServerProxy.deliver(tx2);
522+
queueingServerProxy.deliver(tx1);
523+
524+
// Verify that the server reassembles the transactions correctly.
525+
MockServerTransportListener serverTransportListener =
526+
serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS);
527+
MockServerTransportListener.StreamCreation streamCreation =
528+
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS);
529+
streamCreation.stream.request(1);
530+
InputStream msg = takeNextMessage(streamCreation.listener.messageQueue);
531+
assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage);
532+
}
533+
534+
private static OneWayBinderProxy takeNextBinder(
535+
BlockingBinderDecorator<OneWayBinderProxy> decorator) throws InterruptedException {
536+
OneWayBinderProxy proxy = decorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS);
537+
assertThat(proxy).isNotNull();
538+
return proxy;
539+
}
540+
541+
private static QueueingOneWayBinderProxy.Transaction takeNextTransaction(
542+
QueueingOneWayBinderProxy proxy) throws InterruptedException {
543+
QueueingOneWayBinderProxy.Transaction tx = proxy.pollNextTransaction(TIMEOUT_MS, MILLISECONDS);
544+
assertThat(tx).isNotNull();
545+
return tx;
546+
}
547+
548+
private static InputStream takeNextMessage(BlockingQueue<InputStream> messageQueue)
549+
throws InterruptedException {
550+
InputStream msg = messageQueue.poll(TIMEOUT_MS, MILLISECONDS);
551+
assertThat(msg).isNotNull();
552+
return msg;
553+
}
554+
555+
private static String newStringOfLength(int numChars) {
556+
char[] chars = new char[numChars];
557+
java.util.Arrays.fill(chars, 'x');
558+
return new String(chars);
559+
}
436560
}

binder/src/testFixtures/java/io/grpc/binder/internal/OneWayBinderProxies.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import android.os.RemoteException;
1919
import java.util.concurrent.BlockingQueue;
2020
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.TimeUnit;
2122
import javax.annotation.Nullable;
2223

2324
/** A collection of {@link OneWayBinderProxy}-related test helpers. */
@@ -42,6 +43,18 @@ public OneWayBinderProxy takeNextRequest() throws InterruptedException {
4243
return requests.take();
4344
}
4445

46+
/**
47+
* Returns the next {@link OneWayBinderProxy} that needs decorating, blocking for up to the
48+
* specified timeout if it hasn't yet been provided to {@link #decorate}.
49+
*
50+
* <p>Follow this with a call to {@link #putNextResult(OneWayBinderProxy)} to provide the result
51+
* of {@link #decorate} and unblock the waiting caller.
52+
*/
53+
public OneWayBinderProxy takeNextRequest(long timeout, TimeUnit unit)
54+
throws InterruptedException {
55+
return requests.poll(timeout, unit);
56+
}
57+
4558
/** Provides the next value to return from {@link #decorate}. */
4659
public void putNextResult(T next) throws InterruptedException {
4760
results.put(next);

core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
185185
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
186186
clientStreamTracer1, clientStreamTracer2
187187
};
188-
private final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
188+
protected final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
189189
new ClientStreamTracer() {}
190190
};
191191

@@ -2195,7 +2195,7 @@ public void streamCreated(Attributes transportAttrs, Metadata metadata) {
21952195
}
21962196
}
21972197

2198-
private static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
2198+
protected static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
21992199
public static final StringMarshaller INSTANCE = new StringMarshaller();
22002200

22012201
@Override

0 commit comments

Comments
 (0)