Skip to content

Commit d6aa89f

Browse files
author
vongosling
committed
Polish routing parts
1 parent 08505fb commit d6aa89f

3 files changed

Lines changed: 13 additions & 25 deletions

File tree

openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ public static void main(String[] args) {
4141
//Create the source queue.
4242
resourceManager.createQueue(sourceQueue);
4343

44-
resourceManager.deDuplicate(sourceQueue, destinationQueue);
44+
resourceManager.routing(sourceQueue, destinationQueue);
45+
resourceManager.filter(destinationQueue,"name = 'kaka'");
4546

4647
//Send messages to the source queue ahead of the routing
4748
final Producer producer = messagingAccessPoint.createProducer();
4849
producer.startup();
4950

5051
Message message = producer.createMessage(sourceQueue, "RED_COLOR".getBytes());
51-
message.properties().put("color", "freen").put("shape", "round");
52+
message.properties().put("color", "green").put("shape", "round");
5253

5354
producer.send(message);
5455

openmessaging-api/src/main/java/io/openmessaging/manager/ResourceManager.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,6 @@ public interface ResourceManager {
9191
*/
9292
ListQueueResult listQueues(String nsName);
9393

94-
/**
95-
* Duplicate current queue to target queue, to After the current queue receives the message, it will be copied to
96-
* the target queue.
97-
*
98-
* @param sourceQueueName original queue, user can send message to this queue.
99-
* @param targetQueueName target queue, only receives message from original queue.
100-
* @return
101-
*/
102-
Result duplicate(String sourceQueueName, String targetQueueName);
103-
10494
/**
10595
* In order to enable consumers to get the message in the specified mode, OpenMessaging recommend use SQL
10696
* expression to filter out messages.
@@ -112,13 +102,12 @@ public interface ResourceManager {
112102
Result filter(String queueName, String filterString);
113103

114104
/**
115-
* Deduplicate current queue from sourceQueue, after this operation, <code>targetQueue</code> will no longer
116-
* receive messages sent to the source queue.
105+
* Routing from sourceQueue to targetQueue. Both queues are could be received messages after creating route action.
117106
*
118107
* @param sourceQueue source queue, process messages received from producer and duplicate those to target queue.
119108
* @param targetQueue receive messages from source queue.
120109
* @return
121110
*/
122-
Result deDuplicate(String sourceQueue, String targetQueue);
111+
Result routing(String sourceQueue, String targetQueue);
123112

124113
}

openmessaging-api/src/test/java/io/openmessaging/internal/AccessPointURITest.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package io.openmessaging.internal;
1919

20-
import io.openmessaging.Message;
2120
import io.openmessaging.exception.OMSRuntimeException;
2221
import org.junit.Test;
2322

@@ -28,11 +27,10 @@ public class AccessPointURITest {
2827
private String fullSchemaURI = "oms:rocketmq://alice@rocketmq.apache.org/us-east";
2928

3029
@Test
31-
public void testParse_DriverIsIllegal() throws Exception {
30+
public void testParse_DriverIsIllegal() {
3231
String missDriverType = "oms://alice@rocketmq.apache.org/us-east";
33-
AccessPointURI accessPointURI = null;
3432
try {
35-
accessPointURI = new AccessPointURI(missDriverType);
33+
new AccessPointURI(missDriverType);
3634
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
3735
} catch (Exception e) {
3836
assertThat(e).hasMessageContaining(String.format("The OMS driver URL [%s] is illegal.", missDriverType));
@@ -41,33 +39,33 @@ public void testParse_DriverIsIllegal() throws Exception {
4139

4240
String missRegion = "oms:rocketmq://alice@rocketmq.apache.org/";
4341
try {
44-
accessPointURI = new AccessPointURI(missRegion);
42+
new AccessPointURI(missRegion);
4543
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
4644
} catch (Exception e) {
4745
assertThat(e).hasMessageContaining(String.format("The OMS driver URL [%s] is illegal.", missRegion));
4846
}
4947
}
5048

5149
@Test
52-
public void testGetAccessPointString() throws Exception {
50+
public void testGetAccessPointString() {
5351
AccessPointURI accessPointURI = new AccessPointURI(fullSchemaURI);
5452
assertThat(accessPointURI.getAccessPointString()).isEqualTo(fullSchemaURI);
5553
}
5654

5755
@Test
58-
public void testGetDriverType() throws Exception {
56+
public void testGetDriverType() {
5957
AccessPointURI accessPointURI = new AccessPointURI(fullSchemaURI);
6058
assertThat(accessPointURI.getDriverType()).isEqualTo("rocketmq");
6159
}
6260

6361
@Test
64-
public void testGetAccountId() throws Exception {
62+
public void testGetAccountId() {
6563
AccessPointURI accessPointURI = new AccessPointURI(fullSchemaURI);
6664
assertThat(accessPointURI.getAccountId()).isEqualTo("alice");
6765
}
6866

6967
@Test
70-
public void testGetHosts() throws Exception {
68+
public void testGetHosts() {
7169
AccessPointURI accessPointURI = new AccessPointURI(fullSchemaURI);
7270
assertThat(accessPointURI.getHosts()).isEqualTo("rocketmq.apache.org");
7371

@@ -77,7 +75,7 @@ public void testGetHosts() throws Exception {
7775
}
7876

7977
@Test
80-
public void testGetRegion() throws Exception {
78+
public void testGetRegion() {
8179
AccessPointURI accessPointURI = new AccessPointURI(fullSchemaURI);
8280

8381
assertThat(accessPointURI.getRegion()).isEqualTo("us-east");

0 commit comments

Comments
 (0)