|
47 | 47 | import com.google.common.collect.ImmutableList; |
48 | 48 | import com.google.common.truth.TruthJUnit; |
49 | 49 | import io.grpc.Attributes; |
| 50 | +import io.grpc.CallOptions; |
50 | 51 | import io.grpc.InternalChannelz.SocketStats; |
| 52 | +import io.grpc.Metadata; |
51 | 53 | import io.grpc.ServerStreamTracer; |
52 | 54 | import io.grpc.Status; |
53 | 55 | import io.grpc.binder.AndroidComponentAddress; |
54 | 56 | import io.grpc.binder.ApiConstants; |
55 | 57 | import io.grpc.binder.AsyncSecurityPolicy; |
56 | 58 | import io.grpc.binder.SecurityPolicies; |
| 59 | +import io.grpc.binder.internal.OneWayBinderProxies.*; |
57 | 60 | import io.grpc.binder.internal.SettableAsyncSecurityPolicy.AuthRequest; |
58 | 61 | import io.grpc.internal.AbstractTransportTest; |
| 62 | +import io.grpc.internal.ClientStream; |
59 | 63 | import io.grpc.internal.ClientTransport; |
60 | 64 | import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; |
61 | 65 | import io.grpc.internal.ConnectionClientTransport; |
|
66 | 70 | import io.grpc.internal.MockServerTransportListener; |
67 | 71 | import io.grpc.internal.ObjectPool; |
68 | 72 | import io.grpc.internal.SharedResourcePool; |
| 73 | +import java.io.InputStream; |
69 | 74 | import java.util.List; |
70 | 75 | import java.util.concurrent.Executor; |
71 | 76 | import java.util.concurrent.ScheduledExecutorService; |
@@ -124,6 +129,8 @@ public final class RobolectricBinderTransportTest extends AbstractTransportTest |
124 | 129 | ServiceInfo serviceInfo; |
125 | 130 |
|
126 | 131 | private int nextServerAddress; |
| 132 | + private BlockingBinderDecorator<OneWayBinderProxy> blockingDecorator = |
| 133 | + new BlockingBinderDecorator<>(); |
127 | 134 |
|
128 | 135 | @Parameter(value = 0) |
129 | 136 | public boolean preAuthServersParam; |
@@ -433,4 +440,116 @@ public void flowControlPushBack() {} |
433 | 440 | @Ignore("See BinderTransportTest#serverAlreadyListening") |
434 | 441 | @Override |
435 | 442 | public void serverAlreadyListening() {} |
| 443 | + |
| 444 | + @Test |
| 445 | + public void singleTxnMsgsDeliveredToServerOutOfOrder() throws Exception { |
| 446 | + server.start(serverListener); |
| 447 | + client = newClientTransportBuilder() |
| 448 | + .setFactory( |
| 449 | + newClientTransportFactoryBuilder() |
| 450 | + .setBinderDecorator(blockingDecorator) |
| 451 | + .buildClientTransportFactory()) |
| 452 | + .build(); |
| 453 | + |
| 454 | + runIfNotNull(client.start(mockClientTransportListener)); |
| 455 | + |
| 456 | + OneWayBinderProxy endpointProxy = blockingDecorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS); |
| 457 | + assertThat(endpointProxy).isNotNull(); |
| 458 | + blockingDecorator.putNextResult(endpointProxy); |
| 459 | + |
| 460 | + OneWayBinderProxy serverProxy = blockingDecorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS); |
| 461 | + assertThat(serverProxy).isNotNull(); |
| 462 | + QueueingOneWayBinderProxy proxy = new QueueingOneWayBinderProxy(serverProxy); |
| 463 | + blockingDecorator.putNextResult(proxy); |
| 464 | + |
| 465 | + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); |
| 466 | + |
| 467 | + ClientStream stream = client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers); |
| 468 | + stream.writeMessage(methodDescriptor.streamRequest("one")); |
| 469 | + stream.writeMessage(methodDescriptor.streamRequest("two")); |
| 470 | + stream.flush(); |
| 471 | + |
| 472 | + // We expect at least two transactions for the messages. |
| 473 | + QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(proxy); |
| 474 | + QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(proxy); |
| 475 | + |
| 476 | + // Deliver them out of order! |
| 477 | + proxy.deliver(tx2); |
| 478 | + proxy.deliver(tx1); |
| 479 | + |
| 480 | + // Verify that the server receives them in order. |
| 481 | + MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS); |
| 482 | + MockServerTransportListener.StreamCreation streamCreation = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, |
| 483 | + MILLISECONDS); |
| 484 | + streamCreation.stream.request(2); |
| 485 | + |
| 486 | + InputStream msg1 = streamCreation.listener.messageQueue.poll(TIMEOUT_MS, MILLISECONDS); |
| 487 | + assertThat(msg1).isNotNull(); |
| 488 | + assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one"); |
| 489 | + |
| 490 | + InputStream msg2 = streamCreation.listener.messageQueue.poll(TIMEOUT_MS, MILLISECONDS); |
| 491 | + assertThat(msg2).isNotNull(); |
| 492 | + assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two"); |
| 493 | + } |
| 494 | + |
| 495 | + @Test |
| 496 | + public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception { |
| 497 | + server.start(serverListener); |
| 498 | + client = newClientTransportBuilder() |
| 499 | + .setFactory( |
| 500 | + newClientTransportFactoryBuilder() |
| 501 | + .setBinderDecorator(blockingDecorator) |
| 502 | + .buildClientTransportFactory()) |
| 503 | + .build(); |
| 504 | + |
| 505 | + runIfNotNull(client.start(mockClientTransportListener)); |
| 506 | + |
| 507 | + OneWayBinderProxy endpointProxy = blockingDecorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS); |
| 508 | + assertThat(endpointProxy).isNotNull(); |
| 509 | + blockingDecorator.putNextResult(endpointProxy); |
| 510 | + |
| 511 | + OneWayBinderProxy serverProxy = blockingDecorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS); |
| 512 | + assertThat(serverProxy).isNotNull(); |
| 513 | + QueueingOneWayBinderProxy proxy = new QueueingOneWayBinderProxy(serverProxy); |
| 514 | + blockingDecorator.putNextResult(proxy); |
| 515 | + |
| 516 | + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); |
| 517 | + |
| 518 | + ClientStream stream = client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers); |
| 519 | + |
| 520 | + String largeMessage = newStringOfLength(20 * 1024); |
| 521 | + stream.writeMessage(methodDescriptor.streamRequest(largeMessage)); |
| 522 | + stream.flush(); |
| 523 | + |
| 524 | + // We expect at least two transactions for the large message. |
| 525 | + QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(proxy); |
| 526 | + QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(proxy); |
| 527 | + |
| 528 | + // Deliver them out of order! |
| 529 | + proxy.deliver(tx2); |
| 530 | + proxy.deliver(tx1); |
| 531 | + |
| 532 | + // Verify that the server receives the complete message correctly. |
| 533 | + MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS); |
| 534 | + MockServerTransportListener.StreamCreation streamCreation = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, |
| 535 | + MILLISECONDS); |
| 536 | + streamCreation.stream.request(1); |
| 537 | + |
| 538 | + InputStream msgStream = streamCreation.listener.messageQueue.poll(TIMEOUT_MS, MILLISECONDS); |
| 539 | + assertThat(msgStream).isNotNull(); |
| 540 | + assertThat(methodDescriptor.parseResponse(msgStream)).isEqualTo(largeMessage); |
| 541 | + } |
| 542 | + |
| 543 | + private static QueueingOneWayBinderProxy.Transaction takeNextTransaction( |
| 544 | + QueueingOneWayBinderProxy proxy) throws InterruptedException { |
| 545 | + QueueingOneWayBinderProxy.Transaction tx = proxy.pollNextTransaction(TIMEOUT_MS, MILLISECONDS); |
| 546 | + assertThat(tx).isNotNull(); |
| 547 | + return tx; |
| 548 | + } |
| 549 | + |
| 550 | + private static String newStringOfLength(int numChars) { |
| 551 | + char[] chars = new char[numChars]; |
| 552 | + java.util.Arrays.fill(chars, 'x'); |
| 553 | + return new String(chars); |
| 554 | + } |
436 | 555 | } |
0 commit comments